

要获得与亚马逊 Timestream 类似的功能 LiveAnalytics，可以考虑适用于 InfluxDB 的亚马逊 Timestream。适用于 InfluxDB 的 Amazon Timestream 提供简化的数据摄取和个位数毫秒级的查询响应时间，以实现实时分析。点击[此处](https://docs.aws.amazon.com//timestream/latest/developerguide/timestream-for-influxdb.html)了解更多信息。

本文属于机器翻译版本。若本译文内容与英语原文存在差异，则一律以英文原文为准。

# Amazon MSK
<a name="MSK"></a>

## 使用适用于 Apache Flink 的托管服务，将 Amazon MSK 数据发送到适用于 LiveAnalytics 的 Timestream
<a name="msk-aka"></a>

通过构建数据连接器，该连接器类似适用于 Apache Flink 的托管服务的示例 Timestream 数据连接器，可将数据从 Amazon MSK 发送到 Timestream。有关更多信息，请参阅 [适用于 Apache Flink 的亚马逊托管服务](ApacheFlink.md)。

## 使用 Kafka Connect，将 Amazon MSK 数据发送到适用于 LiveAnalytics 的 Timestream
<a name="msk-kafka-connect"></a>

您可以使用 Kafka Connect，将时间序列数据直接从 Amazon MSK 摄取到适用于 LiveAnalytics 的 Timestream 中。

我们已为 Timestream 创建示例 Kafka Sink Connector。我们还已创建示例 Apache jMeter 测试计划，用于将数据发布到 Kafka 主题，以便数据通过 Timestream Kafka Sink Connector 从该主题流向适用于 LiveAnalytics 的 Timestream 表。所有这些构件均可在 GitHub 上获取。

**注意**  
Java 11 是使用 Timestream Kafka Sink Connector 时的推荐版本。如果有多个 Java 版本，请确保将 Java 11 导出至 JAVA\$1HOME 环境变量。

### 创建示例应用程序
<a name="msk-kafka-connect-app"></a>

要开始使用，请按照以下步骤进行。

1. 在适用于 LiveAnalytics 的 Timestream 中，创建名为 `kafkastream` 的数据库。

   有关详细说明，请参阅流程 [创建数据库](console_timestream.md#console_timestream.db.using-console)。

1. 在适用于 LiveAnalytics 的 Timestream 中，创建名为 `purchase_history` 的表。

   有关详细说明，请参阅流程 [创建表](console_timestream.md#console_timestream.table.using-console)。

1. 按照 中共享的说明创建以下内容：、和。
   + Amazon MSK 集群
   + 配置为 Kafka 生成器客户端计算机的 Amazon EC2 实例 
   + Kafka 主题

   有关详细说明，请参阅 kafka\$1ingestor 项目的[先决条件](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/java/kafka_ingestor#prerequisites)。

1. 克隆 [Timestream Kafka Sink Connector](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector) 存储库。

   有关详细说明，请参阅 GitHub 上的[克隆存储库](https://docs.github.com/en/free-pro-team@latest/github/creating-cloning-and-archiving-repositories/cloning-a-repository)。

1. 编译插件代码。

    有关详细说明，请参阅 GitHub 上的[连接器 - 从源构建](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#connector---build-from-source)。

1. 将以下文件上传到 S3 存储桶：按照 中所述的说明操作。
   + `/target` 目录中的 jar 文件（kafka-connector-timestream->VERSION<-jar-with-dependencies.jar）
   + 示例 json 架构文件，`purchase_history.json`。

   有关详细说明，请参阅《Amazon S3 用户指南》**中的[上传对象](https://docs.aws.amazon.com/AmazonS3/latest/userguide/upload-objects.html)。

1. 创建两个 VPC 端点。MSK 连接器将使用这些端点通过 AWS PrivateLink 访问相关资源。
   + 一个端点用于访问 Amazon S3 存储桶
   + 一个端点用于访问适用于 LiveAnalytics 的 Timestream 表。

   有关说明，请参阅 [VPC 端点](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#vpc-endpoints)。

1. 使用上传的 jar 文件创建自定义插件。

   有关详细说明，请参阅《Amazon MSK 开发人员指南》**中的[插件](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-plugins.html)。

1. 使用[工作程序配置参数](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#worker-configuration-parameters)中描述的 JSON 内容，创建自定义工作程序配置。按照 中所述的说明进行操作 

   有关详细说明，请参阅《Amazon MSK 开发人员指南》**中的[创建自定义工作程序配置](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-workers.html#msk-connect-create-custom-worker-config)。

1. 创建服务执行 IAM 角色。

   有关详细说明，请参阅 [IAM 服务角色](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#iam-service-role)。

1. 使用上述步骤中创建的自定义插件、自定义工作程序配置和服务执行 IAM 角色以及[示例连接器配置](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#sample-connector-configuration)，创建 Amazon MSK 连接器。

   有关详细说明，请参阅《Amazon MSK 开发人员指南》**中的[创建连接器](https://docs.aws.amazon.com/msk/latest/developerguide/msk-connect-connectors.html#mkc-create-connector-intro)。

   请确保将以下配置参数的值更新为相应的值。有关详细信息，请参阅[连接器配置参数](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/integrations/kafka_connector#connector-configuration-parameters)。
   + `aws.region`
   + `timestream.schema.s3.bucket.name`
   + `timestream.ingestion.endpoint`

   连接器创建需要 5 至 10 分钟才能完成。当管道状态更改为 `Running` 时，表明管道已就绪。

1. 持续发布消息流，将数据写入创建的 Kafka 主题。

   有关详细说明，请参阅[如何使用](https://github.com/awslabs/amazon-timestream-tools/tree/mainline/tools/java/kafka_ingestor#how-to-use-it)。

1. 运行一个或多个查询，确保数据从 Amazon MSK 发送到 MSK Connect 再发送到适用于 LiveAnalytics 的 Timestream 表。

   有关详细说明，请参阅流程 [运行查询](console_timestream.md#console_timestream.queries.using-console)。

#### 其他资源
<a name="msk-kafka-connect-more-info"></a>

博客[使用 Kafka Connect 实现从 Kafka 集群到适用于 LiveAnalytics 的 Timestream 实时无服务器数据摄取](https://aws.amazon.com/blogs/database/real-time-serverless-data-ingestion-from-your-kafka-clusters-into-amazon-timestream-using-kafka-connect/)介绍如何使用适用于 LiveAnalytics 的 Timestream Kafka Sink Connector 设置端到端管道，从使用 Apache jMeter 测试计划的 Kafka 生成器客户端计算机开始，该计算机向 Kafka 主题发布数千条示例消息，再到在适用于 LiveAnalytics 的 Timestream 中验证摄取的记录。