使用與融 OpenSearch 合雲卡夫卡的攝入管道 - Amazon OpenSearch Service

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

使用與融 OpenSearch 合雲卡夫卡的攝入管道

您可以使用 OpenSearch 擷取管道將資料從 Confluent 雲端 Kafka 叢集串流到 Amazon OpenSearch 服務網域和無伺服器集合。 OpenSearch OpenSearch 擷取支援公用和私人網路組態,將資料從 Confluent Cloud Kafka 叢集串流到由 OpenSearch 服務或無伺服器管理的網域或集合。 OpenSearch

連接到統一雲公共卡夫卡集群

您可以使用 OpenSearch 擷取管線從 Confluent Cloud Kafka 叢集遷移資料,並具有公用設定,這表示網域DNS名稱可以公開解析。若要這麼做,請將 Confluent Cloud 公用 Kafka 叢集設定為來源,並將 OpenSearch 服務或 OpenSearch 無伺服器設定為目的地的 OpenSearch 擷取管道。這會將您的串流資料從自我管理的來源叢集處理到 AWS管理的目的地網域或集合。

必要條件

建立 OpenSearch 擷取管道之前,請執行下列步驟:

  1. 創建一個融合雲卡夫卡集群作為源。叢集應包含您要擷取至 OpenSearch 服務的資料。

  2. 建立您要將資料移轉至其中的 OpenSearch 服務網域或 OpenSearch 無伺服器集合。如需詳細資訊,請參閱建立 OpenSearch 服務網域建立集合

  3. 設置身份驗證您的協流雲卡夫卡群集與. AWS Secrets Manager按照旋轉密碼中的步驟啟用AWS Secrets Manager 密碼輪換。

  4. 資源型政策附加至您的網域,或將資料存取原則附加至您的集合。這些存取原則可讓 OpenSearch 擷取將資料從自我管理的叢集寫入您的網域或集合。

    下列範例網域存取原則允許您在下一個步驟中建立的管線角色將資料寫入網域。確保您使用自己resource的更新ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    若要建立具有正確權限的IAM角色,以存取集合或網域的寫入資料,請參閱網域的必要權限集合的必要權限

步驟 1:設定管線角色

設定 Confluent Cloud Kafka 叢集管線先決條件之後,請設定要在管線組態中使用的管線角色,並新增寫入 OpenSearch 服務網域或 OpenSearch 無伺服器集合的權限,以及從 Secrets Manager 讀取密碼的權限。

管理網路介面需要下列權限:

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

以下是從 AWS Secrets Manager 服務讀取密碼所需的權限:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region:<account-id>:secret:<,secret-name>"] } ] }

寫入 Amazon OpenSearch 服務域需要以下許可:

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }

步驟 2:建立管道

然後,您可以配置如下所示的 OpenSearch 擷取管道,該管道會將您的 Confluent Cloud Kafka 指定為來源。

您可以指定多個 OpenSearch 服務網域做為資料的目的地。此功能可讓傳入資料進行條件式路由或複寫至多個 OpenSearch Service 網域。

您也可以將資料從來源 Confluent Kafka 叢集遷移到無伺服器集合 OpenSearch 。VPC請務必在管線組態中提供網路存取原則。您可以使用 Confluent 模式註冊表來定義一個 Confluent 模式。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

您可以使用預先設定的藍圖來建立此管道。如需詳細資訊,請參閱使用藍圖建立管道

連接到一致雲卡夫卡集群 VPC

您也可以使用 OpenSearch 擷取管線,將資料從在. VPC 若要這麼做,請將 Confluent Cloud Kafka 叢集設定為來源,並將 OpenSearch 服務或 OpenSearch 無伺服器設定為目的地的 OpenSearch 擷取管線。這會將您的串流資料從 Confluent Cloud Kafka 來源叢集處理至 AWS受管理的目的地網域或集合。

OpenSearch 擷取支援在 Confluent 中所有支援的網路模式中設定的結合雲端 Kafka 叢集。 OpenSearch 擷取中支援下列網路組態模式作為來源:

  • AWS VPC凝視

  • AWS PrivateLink 適用於專用叢集

  • AWS PrivateLink 適用於企業叢集

  • AWS Transit Gateway

必要條件

建立 OpenSearch 擷取管道之前,請執行下列步驟:

  1. 使用包含要導入服務的數據的VPC網絡配置創建 Confluent Cloud Kafka 集群。 OpenSearch

  2. 建立您要將資料移轉至其中的 OpenSearch 服務網域或 OpenSearch 無伺服器集合。如需詳細資訊,請參閱建立 OpenSearch 服務網域建立集合

  3. 設置身份驗證您的協流雲卡夫卡群集與. AWS Secrets Manager按照旋轉密碼中的步驟啟用AWS Secrets Manager 密碼輪換。

  4. 獲取VPC可以訪問自我管理卡夫卡的 ID。選擇VPCCIDR要由 OpenSearch 擷取使用的。

    注意

    如果您使用建立管道,您還必須將 OpenSearch 擷取管道附加到您的,才能使用自我管理的 Kafka。 AWS Management Console VPC若要這麼做,請找到 [網路組態] 區段,選取 [連接至] VPC 核取方塊,然後CIDR從提供的預設選項中選擇您的選項,或選取您自己的選項。您可以使用 CIDR「RFC1918 最新最佳作法」中定義的私人位址空間中的任何位址空間。

    若要提供自訂CIDR,請從下拉式功能表中選取「其他」。若要避免 OpenSearch 擷取與自我管理之間的 IP 位址發生衝突 OpenSearch,請確定自我管理 OpenSearch VPCCIDR與擷取的CIDR不同。 OpenSearch

  5. 資源型政策附加至您的網域,或將資料存取原則附加至您的集合。這些存取原則可讓 OpenSearch 擷取將資料從自我管理的叢集寫入您的網域或集合。

    下列範例網域存取原則允許您在下一個步驟中建立的管線角色將資料寫入網域。確保您使用自己resource的更新ARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    若要建立具有正確權限的IAM角色,以存取集合或網域的寫入資料,請參閱網域的必要權限集合的必要權限

步驟 1:設定管線角色

設定管線必要條件後,請設定要在管線組態中使用的管線角色,並在角色中新增下列權限:

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": ["arn:aws:secretsmanager:{region}:{account-id}:secret:secret-name"] }, { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

您必須對用於建立 OpenSearch 擷取管道的IAM角色提供上述 Amazon EC2 許可,因為管道使用這些許可在您VPC的網路界面中建立和刪除網路界面。管線只能透過此網路介面存取 Kafka 叢集。

步驟 2:建立管道

然後,您可以配置如下所示的 OpenSearch 擷取管道,該管道將 Kafka 指定為來源。

您可以指定多個 OpenSearch 服務網域做為資料的目的地。此功能可讓傳入資料進行條件式路由或複寫至多個 OpenSearch Service 網域。

您也可以將資料從來源 Confluent Kafka 叢集遷移到無伺服器集合 OpenSearch 。VPC請務必在管線組態中提供網路存取原則。您可以使用 Confluent 模式註冊表來定義一個 Confluent 模式。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

您可以使用預先設定的藍圖來建立此管道。如需詳細資訊,請參閱使用藍圖建立管道