将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka - 亚马逊 OpenSearch 服务

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

将 OpenSearch 摄取管道与 Amazon Managed Streaming for Apache Kafka

你可以使用 Kafka 插件将来自亚马逊 Apache Managed Streaming for Apache KafkaMSK(亚马 OpenSearch 逊)的数据提取到你的摄取管道中。借助亚马逊MSK,您可以构建和运行使用 Apache Kafka 来处理流数据的应用程序。 OpenSearch Ingestion AWS PrivateLink 用于连接亚马逊。MSK您可以从 Amazon MSK 和 Amazon MSK 无服务器集群中提取数据。这两个流程之间的唯一区别是在设置管道之前必须执行的先决步骤。

亚马逊MSK先决条件

在创建 OpenSearch 摄取管道之前,请执行以下步骤:

  1. 按照《适用于 A pache Kafka 的亚马逊托管流媒体 Kafka 开发者指南》中创建集群中的步骤创建亚马逊MSK预配置集群。对于代理类型,请选择除类型之外的任何选项,因为 OpenSearch Ingestion 不支持这些t3类型。

  2. 集群处于 “活动” 状态后,请按照开启多VPC连接中的步骤进行操作。

  3. 按照将集群策略附加到集MSK群中的步骤来附加以下策略之一,具体取决于您的集群和管道是否相同 AWS 账户。此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK 集群的 AWS PrivateLink 连接并从 Kafka 主题中读取数据。请务必使用自己的版本resource进行更新ARN。

    当集群与管道位于同一 AWS 账户时,适用以下策略:

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    如果您的 Amazon MSK 集群与您的管道 AWS 账户 不同,请改为附加以下策略。请注意,只有预配置的 Amazon 集群才能进行跨账户访问,Amazon MSK Serverless MSK 集群无法进行跨账户访问。的 ARN fo AWS principal r 应与您ARN为管道配置提供的相同管道角色相同:YAML

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "kafka-cluster:*", "kafka:*" ], "Resource": [ "arn:aws:kafka:us-east-1:{msk-account-id}:cluster/cluster-name/cluster-id", "arn:aws:kafka:us-east-1:{msk-account-id}:topic/cluster-name/cluster-id/*", "arn:aws:kafka:us-east-1:{msk-account-id}:group/cluster-name/*" ] } ] }
  4. 按照创建主题中的步骤创建 Kafka 主题。确保BootstrapServerString这是私有端点(单VPC)引导程序URLs之一。的值--replication-factor应为23,具体取决于您的 Amazon MSK 集群拥有的区域数量。--partitions 的值至少应为 10

  5. 按照生成和使用数据中的步骤生成和使用数据。再说一遍,请确保BootstrapServerString这是您的私有端点(单个VPC)引导程序URLs之一。

Amazon MSK 无服务器先决条件

在创建 OpenSearch 摄取管道之前,请执行以下步骤:

  1. 按照《适用MSK于 Apache Managed Streaming 的亚马逊管理流媒体 Kafka 开发者指南》中创建MSK无服务器集群中的步骤创建亚马逊无服务器集群。

  2. 集群处于 “活动” 状态后,按照将群集策略附加到MSK集群中的步骤来附加以下策略。请务必使用自己的版本resource进行更新ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "Service": "osis.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" }, { "Effect": "Allow", "Principal": { "Service": "osis-pipelines.amazonaws.com" }, "Action": [ "kafka:CreateVpcConnection", "kafka:GetBootstrapBrokers", "kafka:DescribeClusterV2" ], "Resource": "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" } ] }

    此策略允许 OpenSearch Ingestion 创建与您的 Amazon MSK Serverless 集群的 AWS PrivateLink 连接并从 Kafka 主题中读取数据。当您的集群和管道处于相同状态时,此政策适用 AWS 账户,因为Amazon MSK Serverless不支持跨账户访问,因此必须如此。

  3. 按照创建主题中的步骤创建 Kafka 主题。确保BootstrapServerString这是您的简单身份验证和安全层 (SASL) IAM 引导程序URLs之一。的值--replication-factor应为23,具体取决于您的 Amazon MSK Serverless 集群拥有的区域数量。--partitions 的值至少应为 10

  4. 按照生成和使用数据中的步骤生成和使用数据。再说一遍,请确保BootstrapServerString这是您的简单身份验证和安全层 (SASL) IAM 引导程序URLs之一。

步骤 1:配置管道角色

设置 Amazon 预MSK配置集群或无服务器集群后,在管道角色中添加要在工作流配置中使用的以下 Kafka 权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "kafka-cluster:Connect", "kafka-cluster:AlterCluster", "kafka-cluster:DescribeCluster", "kafka:DescribeClusterV2", "kafka:GetBootstrapBrokers" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:cluster/cluster-name/cluster-id" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:*Topic*", "kafka-cluster:ReadData" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:topic/cluster-name/cluster-id/topic-name" ] }, { "Effect": "Allow", "Action": [ "kafka-cluster:AlterGroup", "kafka-cluster:DescribeGroup" ], "Resource": [ "arn:aws:kafka:us-east-1:{account-id}:group/cluster-name/*" ] } ] }

步骤 2:创建管道

然后,你可以配置如下所示的 OpenSearch 摄取管道,将 Kafka 指定为来源:

version: "2" log-pipeline: source: kafka: acknowledgements: true topics: - name: "topic-name" group_id: "group-id" aws: msk: arn: "arn:aws:kafka:{region}:{account-id}:cluster/cluster-name/cluster-id" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" processor: - grok: match: message: - "%{COMMONAPACHELOG}" - date: destination: "@timestamp" from_time_received: true sink: - opensearch: hosts: ["https://search-domain-endpoint.us-east-1.es.amazonaws.com"] index: "index_name" aws_sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" aws_region: "us-east-1" aws_sigv4: true

您可以使用预先配置的 Amazon MSK 蓝图来创建此管道。有关更多信息,请参阅 使用蓝图创建管道

步骤 3:(可选)使用 AWS Glue 架构注册表

当您将 OpenSearch Ingestion 与 Amazon 配合使用时MSK,您可以使用架构注册表中托管的架构AVRO的数据格式。 AWS Glue 在 AWS Glue 架构注册表中,您可以集中发现、控制和演变数据流架构。

要使用此选项,请在管道配置中启用架构 type

schema: type: "aws_glue"

您还必须在您的管道角色中提供 AWS Glue 读取访问权限。您可以使用名为的 AWS 托管策略AWSGlueSchemaRegistryReadonlyAccess。此外,您的注册表必须与您的 OpenSearch Ingestion 管道位于同一 AWS 账户 区域中。

步骤 4:(可选)为 Amazon MSK 管道配置推荐的计算单位 (OCUs)

每个计算单位的每个主题有一个使用者。代理在给定主题的使用者之间均衡分配分区。但是,当分区的数量大于使用者的数量时,Amazon 会在每个使用者上MSK托管多个分区。 OpenSearch Ingestion 具有内置的 auto Scaling,可根据CPU使用情况或管道中待处理记录的数量向上或向下扩展。

为实现最佳性能,请将分区分布在多个计算单位中以便并行处理。如果主题有大量分区(例如,超过 96 个,这是OCUs每个管道的最大分区),我们建议您将管道配置为 1— OCUs 96。因为它将根据需要自动扩缩。如果主题包含的分区数量较少(例如,少于 96 个),则最大计算单位应与分区数量相同。

当管道包含多个主题时,请选择分区数最多的主题作为参考来配置最大计算单位。通过向同一主题和使用者组添加另一个具有新集的OCUs管道,您可以几乎线性地扩展吞吐量。