

Amazon Timestream for LiveAnalytics に類似した機能をご希望の場合は Amazon Timestream for InfluxDB をご検討ください。リアルタイム分析に適した、シンプルなデータインジェストと 1 桁ミリ秒のクエリ応答時間を特徴としています。詳細については、[こちら](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)を参照してください。

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

# Amazon MSK
<a name="MSK"></a>

## Managed Service for Apache Flink を使用して Timestream for LiveAnalytics に Amazon MSK データを送信する
<a name="msk-aka"></a>

Managed Service for Apache Flink のサンプル Timestream データコネクタと類似するデータコネクタを構築することで、Amazon MSK から Timestream にデータを送信できます。詳細については、「[Amazon Managed Service for Apache Flink](ApacheFlink.md)」を参照してください。

## Kafka Connect を使用して Amazon MSK データを Timestream for LiveAnalytics に送信する
<a name="msk-kafka-connect"></a>

Kafka Connect を使用して、時系列データを Amazon MSK から Timestream for LiveAnalytics に直接取り込むことができます。

Timestream 用のサンプル Kafka シンクコネクタを作成しました。また、データを Kafka トピックに発行するためのサンプル Apache JMeter テストプランを作成しました。これにより、データは Timestream Kafka シンクコネクタを介してトピックから Timestream for LiveAnalytics テーブルに流れます。これらのアーティファクトはすべて GitHub で入手できます。

**注記**  
Timestream Kafka シンクコネクタを使用するための推奨バージョンは、Java 11 です。複数の Java バージョンがある場合は、Java 11 を JAVA\$1HOME 環境変数にエクスポートしてください。

### サンプルアプリケーションを作成する
<a name="msk-kafka-connect-app"></a>

使用を開始するには、以下の手順を実行します。

1. Timestream for LiveAnalytics で、`kafkastream` という名前のデータベースを作成します。

   詳細については、手順「[データベースを作成する](console_timestream.md#console_timestream.db.using-console)」を参照してください。

1. Timestream for LiveAnalytics で、`purchase_history` という名前のテーブルを作成します。

   詳細については、手順「[テーブルを作成する](console_timestream.md#console_timestream.table.using-console)」を参照してください。

1. 共有されている手順に従って、以下を作成します。
   + Amazon MSK クラスター
   + Kafka プロデューサークライアントマシンとして設定されている Amazon EC2 インスタンス 
   + Kafka トピック

   詳細な手順については、kafka\$1ingestor プロジェクトの「[Prerequisites](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/java/kafka_ingestor#prerequisites)」を参照してください。

1. [Timestream Kafka シンクコネクタ](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector)リポジトリのクローンを作成します。

   詳細な手順については、GitHub の「[Cloning a repository](https://docs.github.com/en/free-pro-team@latest/github/creating-cloning-and-archiving-repositories/cloning-a-repository)」を参照してください。

1. プラグインコードをコンパイルします。

    詳細な手順については、GitHub の「[Connector – Build from source](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#connector---build-from-source)」を参照してください。

1. 説明されている手順に従って、次のファイルを S3 バケットにアップロードします。
   + `/target` ディレクトリの jar ファイル (kafka-connector-timestream->VERSION<-jar-with-dependencies.jar)
   + サンプル JSON スキーマファイル `purchase_history.json`。

   詳細な手順については、「*Amazon S3 ユーザーガイド*」の「[オブジェクトのアップロード](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)」を参照してください。

1. VPC エンドポイントを 2 つ作成します。これらのエンドポイントは、MSK コネクタが AWS PrivateLink を使用してリソースにアクセスするために使用されます。
   + Amazon S3 バケットにアクセスするためのエンドポイント
   + Timestream for LiveAnalytics テーブルにアクセスするためのエンドポイント。

   詳細な手順については、「[VPC Endpoints](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#vpc-endpoints)」を参照してください。

1. アップロードされた jar ファイルを使用してカスタムプラグインを作成します。

   詳細な手順については、「*Amazon MSK デベロッパーガイド*」の「[カスタムプラグインの作成](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)」を参照してください。

1. 「[Worker Configuration parameters](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#worker-configuration-parameters)」で説明されている JSON コンテンツを使用し、説明されている手順に従ってカスタムワーカー設定を作成します。

   詳細な手順については、「*Amazon MSK デベロッパーガイド*」の[カスタムワーカー設定の作成](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)についてのドキュメントを参照してください。

1. サービス実行 IAM ロールを作成します。

   詳細な手順については、「[IAM Service Role](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#iam-service-role)」を参照してください。

1. 前のステップで作成したカスタムプラグイン、カスタムワーカー設定、サービス実行 IAM ロールと、[サンプルコネクタ設定](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#sample-connector-configuration)を使用して Amazon MSK コネクタを作成します。

   詳細な手順については、「*Amazon MSK デベロッパーガイド*」の「[コネクタを理解する](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#mkc-create-connector-intro)」を参照してください。

   以下の設定パラメータの値は、それぞれの値で更新してください。詳細については、「[Connector Configuration parameters](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#connector-configuration-parameters)」を参照してください。
   + `aws.region`
   + `timestream.schema.s3.bucket.name`
   + `timestream.ingestion.endpoint`

   コネクタの作成が完了するまでに 5 分～10 分かかります。パイプラインの準備が完了すると、ステータスが `Running` に変わります。

1. 作成された Kafka トピックにデータを書き込むための継続的なメッセージのストリームを発行します。

   詳細な手順については、「[How to use it](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/java/kafka_ingestor#how-to-use-it)」を参照してください。

1. 1 つ以上のクエリを実行して、データが Amazon MSK から MSK Connect、さらに Timestream for LiveAnalytics テーブルに送信されていることを確認します。

   詳細については、手順「[クエリを実行する](console_timestream.md#console_timestream.queries.using-console)」を参照してください。

#### その他のリソース
<a name="msk-kafka-connect-more-info"></a>

ブログ「[Real-time serverless data ingestion from your Kafka clusters into Amazon Timestream using Kafka Connect](https://aws.amazon.com/blogs/database/real-time-serverless-data-ingestion-from-your-kafka-clusters-into-amazon-timestream-using-kafka-connect/)」では、Timestream for LiveAnalytics Kafka シンクコネクタを使用したエンドツーエンドのパイプラインの設定について説明しています。これには、Apache JMeter テストプランを使用して数千のサンプルメッセージを Kafka トピックに発行する Kafka プロデューサークライアントマシンから、Timestream for LiveAnalytics テーブルの取り込まれたレコードの検証までが含まれます。