

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon MSK クラスターの SASL/SCRAM 認証を設定する
<a name="msk-password-tutorial"></a>

 AWS Secrets Manager でシークレットを設定するには、Secrets Manager [ユーザーガイドの「シークレットの作成と取得](https://docs.aws.amazon.com/secretsmanager/latest/userguide/tutorials_basic.html)」チュートリアルに従います。 [AWS](https://docs.aws.amazon.com/secretsmanager/latest/userguide/intro.html)

Amazon MSK クラスターのシークレットを作成するときは、次の要件に注意してください。
+ シークレットタイプには、**他のタイプのシークレット (API キーなど)** を選択します。
+ シークレット名は、プレフィックス **AmazonMSK\$1** から始まる必要があります。
+ 既存のカスタム AWS KMS キーを使用するか、シークレットの新しいカスタム AWS KMS キーを作成する必要があります。Secrets Manager は、デフォルトでシークレットのデフォルト AWS KMS キーを使用します。
**重要**  
デフォルト AWS KMS キーで作成されたシークレットは、Amazon MSK クラスターでは使用できません。
+ **[プレーンテキスト]** オプションを使用してキーと値のペアを入力するには、サインイン認証情報データは次の形式である必要があります。

  ```
  {
    "username": "alice",
    "password": "alice-secret"
  }
  ```
+ シークレットの ARN (Amazon リソース名) 値をレコードします。
+ 
**重要**  
「[クラスターの適切なサイズ設定: 標準ブローカーあたりのパーティション数](bestpractices.md#partitions-per-broker)」で説明されている制限を超えるクラスターに Secrets Manager シークレットを関連付けることはできません。
+ を使用してシークレット AWS CLI を作成する場合は、 `kms-key-id`パラメータのキー ID または ARN を指定します。エイリアスは指定しないでください。
+ シークレットをクラスターに関連付けるには、Amazon MSK コンソールまたは [BatchAssociateScramSecret](https://docs.aws.amazon.com/msk/1.0/apireference/clusters-clusterarn-scram-secrets.html#BatchAssociateScramSecret) オペレーションのいずれかを使用します。
**重要**  
シークレットをクラスターに関連付けると、Amazon MSK はそのシークレットにリソースポリシーをアタッチします。これにより、定義したシークレット値にクラスターがアクセスして読み取ることができるようになります。このリソースポリシーは変更しないでください。変更すると、クラスターがシークレットにアクセスできなくなる可能性があります。シークレットリソースポリシーやシークレット暗号化に使用される KMS キーに変更を加えた場合は、必ずシークレットを MSK クラスターに再度関連付けてください。これにより、クラスターがシークレットに引き続きアクセスできるようになります。

  次の `BatchAssociateScramSecret` オペレーションの JSON 入力の例は、シークレットをクラスターに関連付けます。

  ```
  {
    "clusterArn" : "arn:aws:kafka:us-west-2:0123456789019:cluster/SalesCluster/abcd1234-abcd-cafe-abab-9876543210ab-4",          
    "secretArnList": [
      "arn:aws:secretsmanager:us-west-2:0123456789019:secret:AmazonMSK_MyClusterSecret"
    ]
  }
  ```

# サインイン認証情報を使用したクラスターへの接続
<a name="msk-password-tutorial-connect"></a>

シークレットを作成してクラスターに関連付けると、クライアントをクラスターに接続できます。次の手順は、SASL/SCRAM 認証を使用するクラスターにクライアントを接続する方法を示しています。また、トピックの例に対して生成および消費する方法も示します。

**Topics**
+ [SASL/SCRAM 認証を使用してクライアントをクラスターに接続する](#w2aab9c13c29c17c13c11b9b7)
+ [接続の問題のトラブルシューティング](#msk-password-tutorial-connect-troubleshooting)

## SASL/SCRAM 認証を使用してクライアントをクラスターに接続する
<a name="w2aab9c13c29c17c13c11b9b7"></a>

1.  AWS CLI がインストールされているマシンで次のコマンドを実行します。*cluster-ARN* を自身のクラスター ARN に置き換えます。

   ```
   aws kafka get-bootstrap-brokers --cluster-arn clusterARN
   ```

   このコマンドの JSON 結果から、`BootstrapBrokerStringSaslScram` という名前の文字列に関連付けられた値を保存します。この値は後のステップで使用します。

1. クライアントマシンで、シークレットに保存されているユーザー認証情報を含む JAAS 設定ファイルを作成します。例えば、ユーザー **alice** の場合、次の内容を含む `users_jaas.conf` という名前のファイルを作成します。

   ```
   KafkaClient {
      org.apache.kafka.common.security.scram.ScramLoginModule required
      username="alice"
      password="alice-secret";
   };
   ```

1. 次のコマンドを使用して、JAAS 設定ファイルを `KAFKA_OPTS` 環境パラメータとしてエクスポートします。

   ```
   export KAFKA_OPTS=-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf
   ```

1. `/tmp` ディレクトリに `kafka.client.truststore.jks` という名前のファイルを作成します。

1. (オプション)次のコマンドを使用して、 JVM `cacerts` フォルダから JDK キーストアファイルを、前の手順で作成した`kafka.client.truststore.jks`ファイルにコピーします。*JDKFolder* は、インスタンス上の JDK フォルダの名前に置き換えてください。例えば、JDK フォルダには `java-1.8.0-openjdk-1.8.0.201.b09-0.amzn2.x86_64` という名前が付いている場合があります。

   ```
   cp /usr/lib/jvm/JDKFolder/lib/security/cacerts /tmp/kafka.client.truststore.jks
   ```

1. Apache Kafka のインストール済み環境の `bin` ディレクトリに、次の内容を含む `client_sasl.properties` という名前のクライアントプロパティファイルを作成します。このファイルは、SASL メカニズムとプロトコルを定義します。

   ```
   security.protocol=SASL_SSL
   sasl.mechanism=SCRAM-SHA-512
   ```

1. 例となるトピックを作成するには、次のコマンドを実行します。*BootstrapBrokerStringSaslScram* を、このトピックの手順 1 で取得したブートストラップブローカー文字列に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-topics.sh --create --bootstrap-server BootstrapBrokerStringSaslScram --command-config <path-to-client-properties>/client_sasl.properties --replication-factor 3 --partitions 1 --topic ExampleTopicName
   ```

1. 作成したサンプルトピックにデータを生成するには、クライアントマシンで次のコマンドを実行します。*BootstrapBrokerStringSaslScram* を、このトピックの手順 1 で取得したブートストラップブローカー文字列に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-producer.sh --broker-list BootstrapBrokerStringSaslScram --topic ExampleTopicName --producer.config client_sasl.properties
   ```

1. 作成したトピックからデータを消費するには、クライアントマシンで次のコマンドを実行します。*BootstrapBrokerStringSaslScram* を、このトピックの手順 1 で取得したブートストラップブローカー文字列に置き換えます。

   ```
   <path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --from-beginning --consumer.config client_sasl.properties
   ```

## 接続の問題のトラブルシューティング
<a name="msk-password-tutorial-connect-troubleshooting"></a>

Kafka クライアントコマンドを実行する際、特に大規模なトピックやデータセットを操作する場合、Java ヒープメモリエラーが発生する可能性があります。これらのエラーは、Kafka ツールが Java アプリケーションとしてデフォルトのメモリ設定で実行されるため、ワークロードに対して不十分な場合があります。

`Out of Memory Java Heap` エラーを解決するには、`KAFKA_OPTS` 環境変数を変更してメモリ設定を含めることで、Java ヒープサイズを増やすことができます。

次の例では、ヒープの最大サイズを 1GB (`-Xmx1G`) に設定します。この値は、使用可能なシステムメモリと要件に基づいて調整できます。

```
export KAFKA_OPTS="-Djava.security.auth.login.config=<path-to-jaas-file>/users_jaas.conf -Xmx1G"
```

大規模なトピックを消費する場合、メモリ使用量を抑えるために、`--from-beginning` の代わりに時間ベースまたはオフセットベースのパラメータの使用を検討してください。

```
<path-to-your-kafka-installation>/bin/kafka-console-consumer.sh --bootstrap-server BootstrapBrokerStringSaslScram --topic ExampleTopicName --max-messages 1000 --consumer.config client_sasl.properties
```