

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Apache Kafka
<a name="apache-kafka-rule-action"></a>

Apache Kafka (Kafka) アクションは、[Amazon Managed Streaming for Apache Kafka](https://docs.aws.amazon.com//msk/latest/developerguide/what-is-msk.html) (Amazon MSK)、[Confluent Cloud](https://www.confluent.io/) などのサードパーティープロバイダーによって管理される Apache Kafka クラスター、またはセルフマネージド Apache Kafka クラスターに直接メッセージを送信します。Kafka ルールアクションを使用すると、IoT データを Kafka クラスターにルーティングできます。これにより、ストリーミング分析、データ統合、視覚化、ミッションクリティカルなビジネス用途など、さまざまな目的で高性能データパイプラインを構築できます。

**注記**  
このトピックでは、Apache Kafka プラットフォームおよび関連概念について精通していることを前提としています。Apache Kafka の詳細については、「[Apache Kafka](https://kafka.apache.org/)」を参照してください。[MSK Serverless](https://docs.aws.amazon.com//msk/latest/developerguide/serverless.html) はサポートされていません。MSK Serverless クラスターは、Apache Kafka ルールアクションが現在サポートしていない IAM 認証を介してのみ実行できます。Confluent AWS IoT Core で を設定する方法の詳細については、[「Confluent の活用」およびIoT デバイスとデータ管理の課題の解決 AWS](https://aws.amazon.com/blogs/apn/leveraging-confluent-and-aws-to-solve-iot-device-and-data-management-challenges/)」を参照してください。

## 要件
<a name="apache-kafka-rule-action-requirements"></a>

このルールアクションには、以下の要件があります。
+ が 、`ec2:CreateNetworkInterface`、`ec2:DescribeNetworkInterfaces`、、、`ec2:CreateNetworkInterfacePermission``ec2:DeleteNetworkInterface`、`ec2:DescribeVpcAttribute`、および `ec2:DescribeSecurityGroups`オペレーションを実行するために引き受け AWS IoT ることができる IAM `ec2:DescribeSubnets` `ec2:DescribeVpcs`ロール。このロールは、Kafka ブローカーに到達するために、Amazon Virtual Private Cloud への伸縮自在なネットワークインターフェイスを作成および管理します。詳細については、「[必要なアクセスを AWS IoT ルールに付与する](iot-create-role.md)」を参照してください。

   AWS IoT コンソールで、このルールアクションを実行することを に許可 AWS IoT Core するロールを選択または作成できます。

  ネットワークインターフェイスの詳細については、*Amazon EC2 ユーザーガイド*の「[Elastic Network Interface](https://docs.aws.amazon.com/AWSEC2/latest/UserGuide/using-eni.html)」を参照してください。

  指定したロールにアタッチされるポリシーは次の例のようになります。  
****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
      {
          "Effect": "Allow",
          "Action": [
              "ec2:CreateNetworkInterface",
              "ec2:DescribeNetworkInterfaces",
              "ec2:CreateNetworkInterfacePermission",
              "ec2:DeleteNetworkInterface",
              "ec2:DescribeSubnets",
              "ec2:DescribeVpcs",
              "ec2:DescribeVpcAttribute",
              "ec2:DescribeSecurityGroups"
              ],
              "Resource": "*"
          }
      ]
  }
  ```
+  AWS Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合は、 が `secretsmanager:GetSecretValue`および `secretsmanager:DescribeSecret`オペレーションを実行するために引き受け AWS IoT Core ることができる IAM ロールを作成する必要があります。

  指定したロールにアタッチされるポリシーは次の例のようになります。  
****  

  ```
  {
      "Version":"2012-10-17",		 	 	 
      "Statement": [
          {
              "Effect": "Allow",
              "Action": [
                  "secretsmanager:GetSecretValue",
                  "secretsmanager:DescribeSecret"
              ],
              "Resource": [
                  "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_client_truststore-*",
                  "arn:aws:secretsmanager:us-east-1:123456789012:secret:kafka_keytab-*"
              ]
          }
      ]
  }
  ```
+ Amazon Virtual Private Cloud (Amazon VPC) 内で Apache Kafka クラスターを実行できます。Apache Kafka Virtual Private Cloud (VPC) 送信先を作成し、サブネットで NAT ゲートウェイを使用して からパブリック Kafka クラスター AWS IoT にメッセージを転送する必要があります。 AWS IoT ルールエンジンは、送信先に記載されている各サブネットにネットワークインターフェイスを作成し、トラフィックを VPC に直接ルーティングします。送信先を指定すると、 AWS IoT ルールエンジンによって VPC ルールアクションが自動的に作成されます。VPC ルールアクションの詳細については、[Apache Kafka Virtual Private Cloud (VPC) の送信先](kafka-vpc-destination.md) を参照してください。
+ カスタマーマネージド AWS KMS key (KMS キー) を使用して保管中のデータを暗号化する場合、サービスには発信者に代わって KMS キーを使用するアクセス許可が必要です。詳細については、*Amazon Managed Streaming for Apache Kafka デベロッパーガイド*の [Amazon MSK 暗号化](https://docs.aws.amazon.com/msk/latest/developerguide/msk-encryption.html)を参照してください。

## パラメータ
<a name="apache-kafka-rule-action-parameters"></a>

このアクションで AWS IoT ルールを作成するときは、次の情報を指定する必要があります。

destinationArn  
Apache Kafka Virtual Private Cloud (VPC) 送信先の Amazon リソースネーム (ARN)。送信先の作成の詳細については、「」を参照してください[Apache Kafka Virtual Private Cloud (VPC) の送信先](kafka-vpc-destination.md)。

トピック  
Kafka のブローカーに送信されるメッセージの Kafka のトピック。  
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。

キー (オプション)  
Kafka のメッセージキー  
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。

ヘッダー (オプション)  
指定した Kafka ヘッダーのリスト。各ヘッダーは、Kafka アクションを作成するときに指定できるキーと値のペア (1 つのキーと 1 つの値) です。これらのヘッダーを使用して、メッセージペイロードを変更せずに IoT クライアントからダウンストリーム Kafka クラスターにデータをルーティングできます。  
このフィールドは、置換テンプレートを使用して置換できます。インラインルールの関数を Kafka Action のヘッダーで代替テンプレートとして渡す方法については、「[例](#apache-kafka-rule-action-examples)」を参照してください。詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。  
バイナリ形式のヘッダーはサポートされていません。

パーティション (オプション)  
Kafka のメッセージパーティション。  
このフィールドは、置換テンプレートを使用して置換できます。詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。

clientProperties  
Apache Kafka プロデューサークライアントのプロパティを定義するオブジェクト。    
acks (オプション)  
リクエストが完了したとみなされる前に、プロデューサーがサーバーに受信することを求める確認応答の数。  
値として 0 を指定すると、プロデューサーはサーバーからの確認応答を待機しなくなります。サーバーがメッセージを受信しない場合、プロデューサーはメッセージの送信を再試行しません。  
有効な値は、`-1`、`0`、`1`、`all` です。デフォルト値は `1` です。  
bootstrap.servers  
Kafka クラスターへの初期接続を確立するために使用されるホストとポートのペア (`host1:port1`、`host2:port2` など) のリスト。  
compression.type (optional)  
プロデューサーによって生成されるすべてのデータの圧縮タイプ。  
有効な値: `none`、`gzip`、`snappy`、`lz4`、`zstd`。デフォルト値は `none` です。  
security.protocol  
Kafka ブローカーにアタッチするために使用されるセキュリティプロトコル。  
有効な値: `SSL`、`SASL_SSL`。デフォルト値は `SSL` です。  
key.serializer  
`ProducerRecord` で提供するキーオブジェクトをバイトに変換する方法を指定します。  
有効な値: `StringSerializer`。  
value.serializer  
`ProducerRecord` で提供する値オブジェクトをバイトに変換する方法を指定します。  
有効な値: `ByteBufferSerializer`。  
ssl.truststore  
base64 形式のトラストストアファイル、または [AWS Secrets Manager](https://docs.aws.amazon.com/secretsmanager/latest/userguide/) 内のトラストストアファイルの場所。トラストストアが Amazon 認証機関 (CA) によって信頼されている場合は、この値は必須ではありません。  
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して Kafka ブローカーへの接続に必要な認証情報を保存する場合、`get_secret` SQL 関数を使用してこのフィールドの値を取得できます。置換テンプレートの詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。`get_secret` SQL 関数の詳細については、「[get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret)」を参照してください。トラストストアがファイル形式の場合は、`SecretBinary` パラメータを使用します。トラストストアが文字列の形式である場合は、`SecretString` パラメータを使用します。  
この値の最大サイズは 65 KB です。  
ssl.truststore.password  
信頼ストアのパスワード。この値は、トラストストアのパスワードを作成した場合にのみ必要です。  
ssl.keystore  
キーストアファイル。`security.protocol` の値として `SSL` を指定する場合、この値は必須です。  
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、`get_secret` 関数を使用します。置換テンプレートの詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。`get_secret` SQL 関数の詳細については、「[get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret)」を参照してください。`SecretBinary` パラメータを使用します。  
ssl.keystore.password  
キーストアファイルのストアパスワード。`ssl.keystore` の値を指定している場合、この値は必須です。  
このフィールドの値はプレーンテキストにすることができます。このフィールドは、代替テンプレートもサポートします。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、`get_secret` 関数を使用します。置換テンプレートの詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。`get_secret` SQL 関数の詳細については、「[get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret)」を参照してください。`SecretString` パラメータを使用します。  
ssl.key.password  
キーストアファイル内のプライベートキーのパスワード。  
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、`get_secret` 関数を使用します。置換テンプレートの詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。`get_secret` SQL 関数の詳細については、「[get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret)」を参照してください。`SecretString` パラメータを使用します。  
sasl.mechanism  
Kafka のブローカーに接続するために使用されるセキュリティメカニズム。この値は、`security.protocol` の `SASL_SSL` を指定する場合に必要です。  
有効な値: `PLAIN`、`SCRAM-SHA-512`、`GSSAPI`。  
`SCRAM-SHA-512` は、cn-north-1、cn-northwest-1、us-gov-east-1、および us-gov-west-1 リージョンでサポートされている唯一のセキュリティメカニズムです。  
sasl.plain.username  
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `PLAIN` を指定する場合に必要です。  
sasl.plain.password  
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `PLAIN` を指定する場合に必要です。  
sasl.scram.username  
Secrets Manager からシークレット文字列を取得するために使用されるユーザー名。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `SCRAM-SHA-512` を指定する場合に必要です。  
sasl.scram.password  
Secrets Manager からシークレット文字列を取得するために使用されるパスワード。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `SCRAM-SHA-512` を指定する場合に必要です。  
sasl.kerberos.keytab  
Secrets Manager の Kerberos 認証用のキータブファイル。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `GSSAPI` を指定する場合に必要です。  
このフィールドは置換テンプレートをサポートしています。Secrets Manager を使用して、Kafka ブローカーへの接続に必要な認証情報を保存します。このフィールドの値を取得するには、`get_secret` 関数を使用します。置換テンプレートの詳細については、「[置換テンプレート](iot-substitution-templates.md)」を参照してください。`get_secret` SQL 関数の詳細については、「[get\$1secret(secretId, secretType, key, roleArn)](iot-sql-functions.md#iot-sql-function-get-secret)」を参照してください。`SecretBinary`パラメータを使用します。  
sasl.kerberos.service.name  
Apache Kafka が実行される Kerberos プリンシパル名。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `GSSAPI` を指定する場合に必要です。  
sasl.kerberos.krb5.kdc  
Apache Kafka プロデューサークライアントが接続するキー配布センター (KDC) のホスト名。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `GSSAPI` を指定する場合に必要です。  
sasl.kerberos.krb5.realm  
Apache Kafka プロデューサークライアントが接続する領域。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `GSSAPI` を指定する場合に必要です。  
sasl.kerberos.principal  
Kerberos 対応サービスにアクセスするためのチケットを Kerberos が割り当てることができる一意の Kerberos ID。この値は、`security.protocol` の `SASL_SSL` および `sasl.mechanism` の `GSSAPI` を指定する場合に必要です。

## 例
<a name="apache-kafka-rule-action-examples"></a>

次の JSON 例では、 AWS IoT ルールで Apache Kafka アクションを定義します。次の例では、[sourceIp ()](iot-sql-functions.md#iot-function-sourceip) インライン関数を Kafka Action ヘッダーの[代替テンプレート](https://docs.aws.amazon.com//iot/latest/developerguide/iot-substitution-templates.html)として渡します。

```
{
	"topicRulePayload": {
		"sql": "SELECT * FROM 'some/topic'",
		"ruleDisabled": false,
		"awsIotSqlVersion": "2016-03-23",
		"actions": [
			{
				"kafka": {
					"destinationArn": "arn:aws:iot:region:123456789012:ruledestination/vpc/VPCDestinationARN",
					"topic": "TopicName",
					"clientProperties": {
						"bootstrap.servers": "kafka.com:9092",
						"security.protocol": "SASL_SSL",
						"ssl.truststore": "${get_secret('kafka_client_truststore', 'SecretBinary','arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}",
						"ssl.truststore.password": "kafka password",
						"sasl.mechanism": "GSSAPI",
						"sasl.kerberos.service.name": "kafka",
						"sasl.kerberos.krb5.kdc": "kerberosdns.com",
						"sasl.kerberos.keytab": "${get_secret('kafka_keytab','SecretBinary', 'arn:aws:iam::123456789012:role/kafka-get-secret-role-name')}",
						"sasl.kerberos.krb5.realm": "KERBEROSREALM",
						"sasl.kerberos.principal": "kafka-keytab/kafka-keytab.com"
					},
					"headers": [
						{
							"key": "static_header_key",
							"value": "static_header_value"
						},
						{
							"key": "substitutable_header_key",
							"value": "${value_from_payload}"
						},
						{
							"key": "source_ip",
							"value": "${sourceIp()}"
						}
					]
				}
			}
		]
	}
}
```

**Kerberos セットアップに関する重要な注意事項**
+ キー配布センター (KDC) は、ターゲット VPC 内のプライベートドメインネームシステム (DNS) を介して解決可能である必要があります。考えられる方法の 1 つは、KDC DNS エントリをプライベートホストゾーンに追加することです。このアプローチの詳細については、「[プライベートホストゾーンの使用](https://docs.aws.amazon.com/Route53/latest/DeveloperGuide/hosted-zones-private.html)」を参照してください。
+ 各 VPC で DNS 解決が有効になっている必要があります。詳細については、「[Using DNS with Your VPC](https://docs.aws.amazon.com/vpc/latest/userguide/vpc-dns.html)」を参照してください。
+ VPC 送信先のネットワークインターフェイスセキュリティグループとインスタンスレベルのセキュリティグループは、次のポートで VPC 内からのトラフィックを許可する必要があります。
  + ブートストラップブローカーのリスナーポート上の TCP トラフィック (通常は 9092 ですが、9000～9100 の範囲内である必要があります)
  + KDC のポート 88 の TCP および UDP トラフィック
+ `SCRAM-SHA-512` は、cn-north-1、cn-northwest-1、us-gov-east-1、および us-gov-west-1 リージョンでサポートされている唯一のセキュリティメカニズムです。