在 Confluent Cl OpenSearch oud Kafka 中使用采集管道 - 亚马逊 OpenSearch 服务

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

在 Confluent Cl OpenSearch oud Kafka 中使用采集管道

您可以使用 OpenSearch 摄取管道将数据从 Confluent Cloud Kafka 集群流式传输到 OpenSearch 亚马逊服务域和无服务器集合。 OpenSearch OpenSearch Ingestion 支持公用和私有网络配置,用于将数据从 Confluent Cloud Kafka 集群流式传输到由服务或无服务器管理的域或集合。 OpenSearch OpenSearch

连接 Confluent Cloud 公共 Kafka 集群

您可以使用 OpenSearch 摄取管道从具有公共配置的 Confluent Cloud Kafka 集群迁移数据,这意味着可以公开解析域DNS名。为此,请设置一个 OpenSearch 采集管道,将 Confluent Cloud 公共 Kafka 集群作为源,将 OpenSearch 服务或 OpenSearch 无服务器作为目标。这将处理您从自行管理的源集群到托 AWS管目标域或集合的流式数据。

先决条件

在创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 创建充当源的 Confluent Cloud Kafka 集群。集群应包含您要采集到 S OpenSearch ervice 中的数据。

  2. 创建要将数据迁移到的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息,请参阅创建 OpenSearch 服务域创建集合

  3. 使用在 Confluent Cloud Kafka 集群上设置身份验证。 AWS Secrets Manager按照轮换密钥中的步骤启用AWS Secrets Manager 密钥轮换

  4. 基于资源的策略附加到您的网域,或者将数据访问策略附加到您的馆藏。这些访问策略允许 OpenSearch Ingestion 将数据从您的自管理集群写入您的域或集合。

    以下示例域访问策略允许您在下一步中创建的管道角色向域写入数据。请务必使用自己的版本resource进行更新ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    要创建具有访问集合或域名写入数据的正确权限的IAM角色,请参阅域的必需权限集合的必需权限

步骤 1:配置管道角色

设置 Confluent Cloud Kafka 集群管道先决条件后,配置要在管道配置中使用的管道角色,添加写入 OpenSearch 服务域或 OpenSearch 无服务器集合的权限,以及从 Secrets Manager 读取密钥的权限。

管理网络接口需要以下权限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

以下是从 AWS Secrets Manager 服务中读取机密所需的权限:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region:<account-id>:secret:<,secret-name>"] } ] }

写入亚马逊 OpenSearch 服务域需要以下权限:

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }

步骤 2:创建管道

然后,你可以配置如下所示的 OpenSearch 摄取管道,它将你的 Confluent Cloud Kafka 指定为来源。

您可以将多个 OpenSearch 服务域指定为数据的目的地。此功能允许有条件地路由或将传入数据复制到多个 OpenSearch 服务域。

您还可以将数据从源 Confluent Kafka 集群迁移到无服务器集合。 OpenSearch VPC确保在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

您可以使用预配置的蓝图来创建此管道。有关更多信息,请参阅 使用蓝图创建管道

连接到 Confluent Cloud Kafka 集群 VPC

您还可以使用 OpenSearch 摄取管道从中运行的 Confluent Cloud Kafka 集群迁移数据。VPC为此,请设置一个 OpenSearch 采集管道,将 Confluent Cloud Kafka 集群作为源,将 OpenSearch 服务或 OpenSearch 无服务器作为目标。这将处理您从 Confluent Cloud Kafka 源集群到 AWS托管目标域或集合的流式数据。

OpenSearch Ingestion 支持在 Confluent 中以所有支持的网络模式配置的 Confluent Cloud Kafka 集群。 OpenSearch Ingestion 支持以下网络配置模式作为来源:

  • AWS VPC凝视

  • AWS PrivateLink 适用于专用集群

  • AWS PrivateLink 适用于企业集群

  • AWS Transit Gateway

先决条件

在创建 OpenSearch Ingestion 管道之前,请执行以下步骤:

  1. 创建一个 Confluent Cloud Kafka 集群,其VPC网络配置包含您要提取到服务中的数据。 OpenSearch

  2. 创建要将数据迁移到的 OpenSearch 服务域或 OpenSearch 无服务器集合。有关更多信息,请参阅创建 OpenSearch 服务域创建集合

  3. 使用在 Confluent Cloud Kafka 集群上设置身份验证。 AWS Secrets Manager按照轮换密钥中的步骤启用AWS Secrets Manager 密钥轮换

  4. 获取有权访问自管 Kafka 的用户的 ID。VPC选择VPCCIDR要由 OpenSearch Ingestion 使用的。

    注意

    如果您使用创建管道,则还必须将您的 OpenSearch 摄取管道连接到您的管道,才能使用自我管理的 VPC Kafka。 AWS Management Console 为此,请找到 “网络配置” 部分,选中 “连接到” VPC 复选框,然后CIDR从提供的默认选项中选择您的,或者选择自己的选项。您可以按照 RFC1918 年《当前最佳实践》的定义,使用私有地址空间中的任何CIDR一个。

    要提供自定义CIDR,请从下拉菜单中选择 “其他”。为避免 OpenSearch 摄取和自我管理之间的 IP 地址冲突 OpenSearch,请确保自我管理与 for Ingest OpenSearch VPC CIDR ion 不同。CIDR OpenSearch

  5. 基于资源的策略附加到您的网域,或者将数据访问策略附加到您的馆藏。这些访问策略允许 OpenSearch Ingestion 将数据从您的自管理集群写入您的域或集合。

    以下示例域访问策略允许您在下一步中创建的管道角色向域写入数据。请务必使用自己的版本resource进行更新ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    要创建具有访问集合或域名写入数据的正确权限的IAM角色,请参阅域的必需权限集合的必需权限

步骤 1:配置管道角色

设置好管道先决条件后,配置要在工作流配置中使用的管道角色,并在该角色中添加以下权限:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": ["arn:aws:secretsmanager:{region}:{account-id}:secret:secret-name"] }, { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

您必须为用于创建 OpenSearch Ingestion 管道的IAM角色提供上述 Amazon EC2 权限,因为管道使用这些权限在您的中创建和删除网络接口。VPC管道只能通过此网络接口访问 Kafka 集群。

步骤 2:创建管道

然后,您可以配置如下所示的 OpenSearch Ingestion 管道,将 Kafka 指定为来源。

您可以将多个 OpenSearch 服务域指定为数据的目的地。此功能允许有条件地路由或将传入数据复制到多个 OpenSearch 服务域。

您还可以将数据从源 Confluent Kafka 集群迁移到无服务器集合。 OpenSearch VPC确保在管道配置中提供网络访问策略。您可以使用 Confluent 架构注册表来定义 Confluent 架构。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

您可以使用预配置的蓝图来创建此管道。有关更多信息,请参阅 使用蓝图创建管道