Apache Kafka - AWS IoT Core

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Apache Kafka

Apache Kafka (Kafka) アクションは、Amazon Managed Streaming for Apache Kafka (Amazon MSK)、Confluent Cloud などのサードパーティープロバイダーによって管理される Apache Kafka クラスター、またはセルフマネージド Apache Kafka クラスターに直接メッセージを送信します。Kafka ルールアクションを使用すると、IoT データを Kafka クラスターにルーティングできます。これにより、ストリーミング分析、データ統合、視覚化、ミッションクリティカルなビジネス用途など、さまざまな目的で高性能データパイプラインを構築できます。

注記

このトピックでは、Apache Kafka プラットフォームおよび関連概念について精通していることを前提としています。Apache Kafka の詳細については、「Apache Kafka」を参照してください。 MSKサーバーレスはサポートされていません。 MSKサーバーレスクラスターは、Apache Kafka ルールアクションが現在サポートしていないIAM認証を介してのみ実行できます。Confluent AWS IoT Core で を設定する方法の詳細については、「Confluent と を活用して IoT デバイスとデータ管理の課題を解決 AWS する」を参照してください。

要件

このルールアクションには、以下の要件があります。

  • が 、ec2:CreateNetworkInterfaceec2:DescribeNetworkInterfaces、、、、ec2:DescribeVpcAttribute、および ec2:DescribeSecurityGroupsオペレーションを実行するために引き受け AWS IoT ることができる ec2:CreateNetworkInterfacePermission ec2:DeleteNetworkInterface ec2:DescribeSubnets ec2:DescribeVpcsIAMロール。このロールは、Kafka ブローカーに到達するために、Amazon Virtual Private Cloud への伸縮自在なネットワークインターフェイスを作成および管理します。詳細については、「AWS IoT ルールに必要なアクセス許可の付与」を参照してください。

    AWS IoT コンソールでは、 がこのルールアクションを実行することを許可 AWS IoT Core するロールを選択または作成できます。

    ネットワークインターフェイスの詳細については、「Amazon EC2ユーザーガイド」の「Elastic Network Interface」を参照してください。

    指定したロールにアタッチされるポリシーは次の例のようになります。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:CreateNetworkInterface", "ec2:DescribeNetworkInterfaces", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:DescribeVpcAttribute", "ec2:DescribeSecurityGroups" ], "Resource": "*" } ] }
  • AWS Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、 が secretsmanager:GetSecretValueおよび secretsmanager:DescribeSecretオペレーションを実行するために引き受け AWS IoT Core ることができる IAMロールを作成する必要があります。

    指定したロールにアタッチされるポリシーは次の例のようになります。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue", "secretsmanager:DescribeSecret" ], "Resource": [ "arn:aws:secretsmanager:region:123456789012:secret:kafka_client_truststore-*", "arn:aws:secretsmanager:region:123456789012:secret:kafka_keytab-*" ] } ] }
  • Amazon Virtual Private Cloud (Amazon ) 内で Apache Kafka クラスターを実行できますVPC。からパブリック Kafka クラスター AWS IoT にメッセージを転送するには、Amazon VPC送信先を作成し、サブネットで NATゲートウェイを使用する必要があります。 AWS IoT ルールエンジンは、VPC宛先にリストされている各サブネットにネットワークインターフェイスを作成し、トラフィックを に直接ルーティングしますVPC。VPC 送信先を作成すると、 AWS IoT ルールエンジンは自動的にVPCルールアクションを作成します。VPC ルールアクションの詳細については、「」を参照してください仮想プライベートクラウド (VPC) の送信先

  • カスタマーマネージド AWS KMS key (KMS キー) を使用して保管中のデータを暗号化する場合、サービスには発信者に代わってKMSキーを使用するアクセス許可が必要です。詳細については、「Amazon Managed Streaming for Apache Kafka デベロッパーガイド」の「Amazon MSK暗号化」を参照してください。

パラメータ

このアクションで AWS IoT ルールを作成するときは、次の情報を指定する必要があります。

destinationArn

VPC 送信先の Amazon リソースネーム (ARN)。VPC 送信先の作成については、「」を参照してください仮想プライベートクラウド (VPC) の送信先

トピック

Kafka のブローカーに送信されるメッセージの Kafka のトピック。

このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。

キー (オプション)

Kafka のメッセージキー

このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。

ヘッダー (オプション)

指定した Kafka ヘッダーのリスト。各ヘッダーは、Kafka アクションを作成するときに指定できるキーと値のペア (1 つのキーと 1 つの値) です。これらのヘッダーを使用して、メッセージペイロードを変更せずに IoT クライアントからダウンストリーム Kafka クラスターにデータをルーティングできます。

このフィールドは、置換テンプレートを使用して置換できます。インラインルールの関数を Kafka Action のヘッダーで代替テンプレートとして渡す方法については、「」を参照してください。詳細については、「置換テンプレート」を参照してください。

注記

バイナリ形式のヘッダーはサポートされていません。

パーティション (オプション)

Kafka のメッセージパーティション。

このフィールドは、置換テンプレートを使用して置換できます。詳細については、「置換テンプレート」を参照してください。

clientProperties

Apache Kafka プロデューサークライアントのプロパティを定義するオブジェクト。

acks (オプション)

リクエストが完了したとみなされる前に、プロデューサーがサーバーに受信することを求める確認応答の数。

値として 0 を指定すると、プロデューサーはサーバーからの確認応答を待機しなくなります。サーバーがメッセージを受信しない場合、プロデューサーはメッセージの送信を再試行しません。

有効な値は、-101all です。デフォルト値は 1 です。

bootstrap.servers

Kafka クラスターへの初期接続を確立するために使用されるホストとポートのペア (host1:port1host2:port2 など) のリスト。

compression.type (optional)

プロデューサーによって生成されるすべてのデータの圧縮タイプ。

有効な値: nonegzipsnappylz4zstd。デフォルト値は none です。

security.protocol

Kafka ブローカーにアタッチするために使用されるセキュリティプロトコル。

有効な値: SSLSASL_SSL。デフォルト値は SSL です。

key.serializer

ProducerRecord で提供するキーオブジェクトをバイトに変換する方法を指定します。

有効な値: StringSerializer

value.serializer

ProducerRecord で提供する値オブジェクトをバイトに変換する方法を指定します。

有効な値: ByteBufferSerializer

ssl.truststore

base64 形式のトラストストアファイル、または AWS Secrets Manager 内のトラストストアファイルの場所。トラストストアが Amazon 認証機関 (CA) によって信頼されている場合は、この値は必須ではありません。

このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、 get_secretSQL関数を使用してこのフィールドの値を取得できます。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)。トラストストアがファイル形式の場合は、SecretBinary パラメータを使用します。トラストストアが文字列の形式である場合は、SecretString パラメータを使用します。

この値の最大サイズは 65 KB です。

ssl.truststore.password

信頼ストアのパスワード。この値は、トラストストアのパスワードを作成した場合にのみ必要です。

ssl.keystore

キーストアファイル。security.protocol の値として SSL を指定する場合、この値は必須です。

このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、 get_secretSQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)SecretBinary パラメータを使用します。

ssl.keystore.password

キーストアファイルのストアパスワード。ssl.keystore の値を指定している場合、この値は必須です。

このフィールドの値はプレーンテキストにすることができます。このフィールドは、代替テンプレートもサポートします。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、 get_secretSQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)SecretString パラメータを使用します。

ssl.key.password

キーストアファイル内のプライベートキーのパスワード。

このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、 get_secretSQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)SecretString パラメータを使用します。

sasl.mechanism

Kafka のブローカーに接続するために使用されるセキュリティメカニズム。この値は、security.protocolSASL_SSL を指定する場合に必要です。

有効な値: PLAINSCRAM-SHA-512GSSAPI

注記

SCRAM-SHA-512 は、cn-north-1、cn-northwest-1、 us-gov-east-1、 us-gov-west-1 の各リージョンでサポートされている唯一のセキュリティメカニズムです。

sasl.plain.username

Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、security.protocolSASL_SSL および sasl.mechanismPLAIN を指定する場合に必要です。

sasl.plain.password

Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、security.protocolSASL_SSL および sasl.mechanismPLAIN を指定する場合に必要です。

sasl.scram.username

Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、security.protocolSASL_SSL および sasl.mechanismSCRAM-SHA-512 を指定する場合に必要です。

sasl.scram.password

Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、security.protocolSASL_SSL および sasl.mechanismSCRAM-SHA-512 を指定する場合に必要です。

sasl.kerberos.keytab

Secrets Manager の Kerberos 認証用のキータブファイル。この値は、security.protocolSASL_SSL および sasl.mechanismGSSAPI を指定する場合に必要です。

このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、 get_secretSQL関数を使用します。置換テンプレートの詳細については、「置換テンプレート」を参照してください。get_secret SQL 関数の詳細については、「」を参照してくださいget_secret(secretId、secretType、キー、roleArn)SecretBinaryパラメータを使用します。

sasl.kerberos.service.name

Apache Kafka が実行される Kerberos プリンシパル名。この値は、security.protocolSASL_SSL および sasl.mechanismGSSAPI を指定する場合に必要です。

sasl.kerberos.krb5.kdc

Apache Kafka プロデューサークライアントが接続するキーディストリビューションセンター (KDC) のホスト名。この値は、security.protocolSASL_SSL および sasl.mechanismGSSAPI を指定する場合に必要です。

sasl.kerberos.krb5.realm

Apache Kafka プロデューサークライアントが接続する領域。この値は、security.protocolSASL_SSL および sasl.mechanismGSSAPI を指定する場合に必要です。

sasl.kerberos.principal

Kerberos 対応サービスにアクセスするためのチケットを Kerberos が割り当てることができる一意の Kerberos ID。この値は、security.protocolSASL_SSL および sasl.mechanismGSSAPI を指定する場合に必要です。

次のJSON例では、 AWS IoT ルールで Apache Kafka アクションを定義します。次の例では、 sourceIp() インライン関数を Kafka アクションヘッダーの置換テンプレートとして渡します。

{ "topicRulePayload": { "sql": "SELECT * FROM 'some/topic'", "ruleDisabled": false, "awsIotSqlVersion": "2016-03-23", "actions": [ { "kafka": { "destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN", "topic": "TopicName", "clientProperties": { "bootstrap.servers": "kafka.com:9092", "security.protocol": "SASL_SSL", "ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "ssl.truststore.password": "kafka password", "sasl.mechanism": "GSSAPI", "sasl.kerberos.service.name": "kafka", "sasl.kerberos.krb5.kdc": "kerberosdns.com", "sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}", "sasl.kerberos.krb5.realm": "KERBEROSREALM", "sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com" }, "headers": [ { "key": "static_header_key", "value": "static_header_value" }, { "key": "substitutable_header_key", "value": "${value_from_payload}" }, { "key": "source_ip", "value": "${sourceIp()}" } ] } } ] } }

Kerberos セットアップに関する重要な注意事項

  • キー分散センター (KDC) は、ターゲット 内のプライベートドメインネームシステム (DNS) を介して解決可能である必要がありますVPC。考えられる方法の 1 つは、KDCDNSエントリをプライベートホストゾーンに追加することです。このアプローチの詳細については、「プライベートホストゾーンの使用」を参照してください。

  • 各 でDNS解像度が有効になっていVPCる必要があります。詳細については、「 DNSで を使用するVPC」を参照してください。

  • VPC 送信先のネットワークインターフェイスセキュリティグループとインスタンスレベルのセキュリティグループは、次のポートVPCの 内からのトラフィックを許可する必要があります。

    • TCP ブートストラップブローカーリスナーポートの トラフィック (多くの場合 9092 ですが、9000~9100 の範囲内である必要があります)

    • TCP のポート 88 の および UDPトラフィック KDC

  • SCRAM-SHA-512 は、cn-north-1、cn-northwest-1、 us-gov-east-1、 us-gov-west-1 の各リージョンでサポートされている唯一のセキュリティメカニズムです。