Confluent Cloud Kafka での OpenSearch Ingestion パイプラインの使用 - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Confluent Cloud Kafka での OpenSearch Ingestion パイプラインの使用

OpenSearch Ingestion パイプラインを使用して、Confluent Cloud Kafka クラスターから Amazon OpenSearch Service ドメインおよび OpenSearch Serverless コレクションにデータをストリーミングできます。 OpenSearch Ingestion は、Confluent Cloud Kafka クラスターから OpenSearch サービスまたは OpenSearch Serverless によって管理されるドメインまたはコレクションにデータをストリーミングするためのパブリックネットワーク設定とプライベートネットワーク設定の両方をサポートします。

Confluent Cloud パブリック Kafka クラスターへの接続

Ingestion OpenSearch パイプラインを使用して、パブリック設定の Confluent Cloud Kafka クラスターからデータを移行できます。つまり、DNSドメイン名をパブリックに解決できます。これを行うには、Confluent Cloud OpenSearch パブリック Kafka クラスターをソースとして、 OpenSearch サービスまたは OpenSearch サーバーレスを送信先として Ingestion パイプラインを設定します。これにより、自己管理型ソースクラスターから 管理 AWS型送信先ドメインまたはコレクションへのストリーミングデータが処理されます。

前提条件

OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。

  1. ソースとして機能する Confluent Cloud Kafka クラスターを作成します。クラスターには、 OpenSearch サービスに取り込むデータが含まれている必要があります。

  2. データを移行する OpenSearch サービスドメインまたは OpenSearch サーバーレスコレクションを作成します。詳細については、 OpenSearch 「サービスドメインの作成」および「コレクションの作成」を参照してください。

  3. を使用して Confluent Cloud Kafka クラスターで認証を設定します AWS Secrets Manager。「シークレットのローテーション」の手順に従って、AWS Secrets Manager シークレットのローテーションを有効にします。

  4. リソースベースのポリシーをドメインにアタッチするか、データアクセスポリシーをコレクションにアタッチします。これらのアクセスポリシーにより、Ingestion OpenSearch は自己管理型クラスターからドメインまたはコレクションにデータを書き込むことができます。

    次のサンプルドメインアクセスポリシーでは、次のステップで作成するパイプラインロールがドメインにデータを書き込むことを許可します。必ず独自の resourceで を更新してくださいARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAMロールを作成するには、「ドメインに必要なアクセス許可」および「コレクションに必要なアクセス許可」を参照してください。

ステップ 1: パイプラインロールを設定する

Confluent Cloud Kafka クラスターパイプラインの前提条件を設定したら、パイプライン設定で使用するパイプラインロールを設定し、 OpenSearch サービスドメインまたは OpenSearch サーバーレスコレクションに書き込むアクセス許可と、Secrets Manager からシークレットを読み取るアクセス許可を追加します。

ネットワークインターフェイスを管理するには、次のアクセス許可が必要です。

{ "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

以下は、 AWS Secrets Manager サービスからシークレットを読み取るために必要なアクセス許可です。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": ["secretsmanager:GetSecretValue"], "Resource": ["arn:aws:secretsmanager:<region:<account-id>:secret:<,secret-name>"] } ] }

Amazon OpenSearch Service ドメインに書き込むには、次のアクセス許可が必要です。

{ "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{your-account-id}:role/{pipeline-role}" }, "Action": ["es:DescribeDomain", "es:ESHttp*"], "Resource": "arn:aws:es:{region}:{your-account-id}:domain/{domain-name}/*" } ] }

ステップ 2: パイプラインを作成する

その後、Confluent Cloud Kafka OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。

複数の OpenSearch サービスドメインをデータの送信先として指定できます。この機能により、受信データを複数の OpenSearch サービスドメインに条件付きでルーティングまたはレプリケーションできます。

ソース Confluent Kafka クラスターから OpenSearch サーバーレスVPCコレクションにデータを移行することもできます。パイプライン設定内でネットワークアクセスポリシーを指定していることを確認します。Confluent スキーマレジストリを使用して、Confluent スキーマを定義できます。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

事前設定された設計図を使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。

内の Confluent Cloud Kafka クラスターへの接続 VPC

Ingestion OpenSearch パイプラインを使用して、 で実行されている Confluent Cloud Kafka クラスターからデータを移行することもできますVPC。そのためには、Confluent Cloud Kafka クラスターをソースとして、 OpenSearch サービスまたは OpenSearch サーバーレスを送信先として OpenSearch Ingestion パイプラインを設定します。これにより、Confluent Cloud Kafka ソースクラスターから が AWS管理する送信先ドメインまたはコレクションへのストリーミングデータが処理されます。

OpenSearch Ingestion は、Confluent でサポートされているすべてのネットワークモードで設定された Confluent Cloud Kafka クラスターをサポートします。以下のネットワーク設定モードが Ingestion OpenSearch のソースとしてサポートされています。

  • AWS VPC ピアリング

  • AWS PrivateLink 専用クラスター用の

  • AWS PrivateLink for Enterprise クラスター

  • AWS Transit Gateway

前提条件

OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。

  1. OpenSearch サービスに取り込むデータを含むVPCネットワーク設定で Confluent Cloud Kafka クラスターを作成します。

  2. データを移行する OpenSearch サービスドメインまたは OpenSearch サーバーレスコレクションを作成します。詳細については、 OpenSearch 「サービスドメインの作成」および「コレクションの作成」を参照してください。

  3. を使用して Confluent Cloud Kafka クラスターで認証を設定します AWS Secrets Manager。「シークレットのローテーション」の手順に従って、AWS Secrets Manager シークレットのローテーションを有効にします。

  4. セルフマネージド Kafka VPCにアクセスできる の ID を取得します。取り込みVPCCIDRで使用する OpenSearch を選択します。

    注記

    を使用してパイプライン AWS Management Console を作成する場合は、セルフマネージド Kafka OpenSearch を使用するには、取り込みパイプラインも VPCにアタッチする必要があります。これを行うには、ネットワーク設定セクションを検索し、「アタッチ先VPC」チェックボックスをオンにして、提供されているデフォルトオプションのいずれかCIDRから を選択するか、独自のものを選択します。RFC 1918 年ベストプラクティス で定義されているように、プライベートアドレス空間CIDRから任意の を使用できます。

    カスタム を指定するにはCIDR、ドロップダウンメニューからその他を選択します。取り込み とセルフマネージド OpenSearch の間の IP アドレスの衝突を回避するには OpenSearch、セルフマネージド OpenSearch VPC CIDR が取り込み CIDR の OpenSearch と異なることを確認してください。

  5. リソースベースのポリシーをドメインにアタッチするか、コレクションにデータアクセスポリシーをアタッチします。これらのアクセスポリシーにより、Ingestion OpenSearch は自己管理型クラスターからドメインまたはコレクションにデータを書き込むことができます。

    注記

    AWS PrivateLink を使用して Confluent Cloud Kafka を接続する場合は、VPCDHCPオプション を設定する必要があります。DNS ホスト名DNS解決を有効にする必要があります。

    次のサンプルドメインアクセスポリシーでは、次のステップで作成するパイプラインロールがドメインにデータを書き込むことを許可します。必ず独自の resourceで を更新してくださいARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAMロールを作成するには、「ドメインに必要なアクセス許可」および「コレクションに必要なアクセス許可」を参照してください。

ステップ 1: パイプラインロールを設定する

パイプラインの前提条件を設定したら、パイプライン設定で使用するパイプラインロールを設定し、ロールに次のアクセス許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": ["arn:aws:secretsmanager:{region}:{account-id}:secret:secret-name"] }, { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

パイプラインはこれらのEC2アクセス許可を使用して OpenSearch でネットワークインターフェイスを作成および削除するため、取り込みパイプラインの作成に使用するIAMロールに対して上記の Amazon アクセス許可を提供する必要がありますVPC。パイプラインは、このネットワークインターフェイスを介してのみ Kafka クラスターにアクセスできます。

ステップ 2: パイプラインを作成する

その後、Kafka OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。

複数の OpenSearch サービスドメインをデータの送信先として指定できます。この機能により、受信データを複数の OpenSearch サービスドメインに条件付きでルーティングまたはレプリケーションできます。

ソース Confluent Kafka クラスターから OpenSearch サーバーレスVPCコレクションにデータを移行することもできます。パイプライン設定内でネットワークアクセスポリシーを指定していることを確認します。Confluent スキーマレジストリを使用して、Confluent スキーマを定義できます。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

事前設定された設計図を使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。