Kafka OpenSearch での取り込みパイプラインの使用 - Amazon OpenSearch サービス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Kafka OpenSearch での取り込みパイプラインの使用

自己管理型 Kafka で Ingestion OpenSearch パイプラインを使用すると、Amazon OpenSearch Service ドメインと OpenSearch Serverless コレクションにデータをストリーミングできます。Ingestion OpenSearch は、自己管理型 Kafka から OpenSearch サービスまたは OpenSearch Serverless によって管理されるドメインまたはコレクションにデータをストリーミングするためのパブリックネットワーク設定とプライベートネットワーク設定の両方をサポートします。

パブリック Kafka クラスターへの接続

Ingestion OpenSearch パイプラインを使用して、パブリック設定のセルフマネージド Kafka クラスターからデータを移行できます。つまり、DNSドメイン名をパブリックに解決できます。これを行うには、セルフマネージド Kafka OpenSearch をソースとして、 OpenSearch Service または OpenSearch Serverless を送信先として Ingestion パイプラインを設定します。これにより、自己管理型ソースクラスターから 管理 AWS型送信先ドメインまたはコレクションへのストリーミングデータが処理されます。

前提条件

OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。

  1. パブリックネットワーク設定でセルフマネージド Kafka クラスターを作成します。クラスターには、 OpenSearch サービスに取り込むデータが含まれている必要があります。

  2. データを移行する OpenSearch サービスドメインまたは OpenSearch サーバーレスコレクションを作成します。詳細については、 OpenSearch 「サービスドメインの作成」および「コレクションの作成」を参照してください。

  3. を使用して、セルフマネージドクラスターで認証を設定します AWS Secrets Manager。「シークレットのローテーション」の手順に従って、AWS Secrets Manager シークレットのローテーションを有効にします。

  4. リソースベースのポリシーをドメインにアタッチするか、データアクセスポリシーをコレクションにアタッチします。これらのアクセスポリシーにより、Ingestion OpenSearch は自己管理型クラスターからドメインまたはコレクションにデータを書き込むことができます。

    次のサンプルドメインアクセスポリシーでは、次のステップで作成するパイプラインロールがドメインにデータを書き込むことを許可します。必ず独自の resourceで を更新してくださいARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAMロールを作成するには、「ドメインに必要なアクセス許可」および「コレクションに必要なアクセス許可」を参照してください。

ステップ 1: パイプラインロールを設定する

Kafka パイプラインの前提条件を設定したら、パイプライン設定で使用するパイプラインロールを設定し、 OpenSearch サービスドメインまたは OpenSearch サーバーレスコレクションに書き込むアクセス許可と、Secrets Manager からシークレットを読み取るアクセス許可を追加します。

ステップ 2: パイプラインを作成する

その後、Kafka OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。

複数の OpenSearch サービスドメインをデータの送信先として指定できます。この機能により、受信データを複数の OpenSearch サービスドメインに条件付きでルーティングまたはレプリケーションできます。

ソース Confluent Kafka クラスターから OpenSearch サーバーレスVPCコレクションにデータを移行することもできます。パイプライン設定内でネットワークアクセスポリシーを指定していることを確認します。Confluent スキーマレジストリを使用して、Confluent スキーマを定義できます。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

事前設定された設計図を使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。

内の Kafka クラスターへの接続 VPC

取り込みパイプラインを使用して、 OpenSearch で実行されているセルフマネージド Kafka クラスターからデータを移行することもできますVPC。これを行うには、セルフマネージド Kafka OpenSearch をソースとして、 OpenSearch Service または OpenSearch Serverless を送信先として Ingestion パイプラインを設定します。これにより、自己管理型ソースクラスターから 管理 AWS型送信先ドメインまたはコレクションへのストリーミングデータが処理されます。

前提条件

OpenSearch 取り込みパイプラインを作成する前に、次のステップを実行します。

  1. OpenSearch サービスに取り込むデータを含むVPCネットワーク設定で、セルフマネージド Kafka クラスターを作成します。

  2. データを移行する OpenSearch サービスドメインまたは OpenSearch サーバーレスコレクションを作成します。詳細については、 OpenSearch 「サービスドメインの作成」および「コレクションの作成」を参照してください。

  3. を使用して、セルフマネージドクラスターで認証を設定します AWS Secrets Manager。「シークレットのローテーション」の手順に従って、AWS Secrets Manager シークレットのローテーションを有効にします。

  4. セルフマネージド Kafka VPCにアクセスできる の ID を取得します。取り込みVPCCIDRで使用する OpenSearch を選択します。

    注記

    を使用してパイプライン AWS Management Console を作成する場合は、セルフマネージド Kafka OpenSearch を使用するには、取り込みパイプラインも VPCにアタッチする必要があります。これを行うには、ネットワーク設定セクションを検索し、「アタッチ先VPC」チェックボックスをオンにして、提供されているデフォルトオプションのいずれかCIDRから を選択するか、独自のものを選択します。RFC 1918 年ベストプラクティス で定義されているように、プライベートアドレス空間CIDRから任意の を使用できます。

    カスタム を指定するにはCIDR、ドロップダウンメニューからその他を選択します。取り込み とセルフマネージド OpenSearch の間の IP アドレスの衝突を回避するには OpenSearch、セルフマネージド OpenSearch VPC CIDRが取り込み CIDR の OpenSearch と異なることを確認してください。

  5. リソースベースのポリシーをドメインにアタッチするか、コレクションにデータアクセスポリシーをアタッチします。これらのアクセスポリシーにより、Ingestion OpenSearch は自己管理型クラスターからドメインまたはコレクションにデータを書き込むことができます。

    次のサンプルドメインアクセスポリシーでは、次のステップで作成するパイプラインロールがドメインにデータを書き込むことを許可します。必ず独自の resourceで を更新してくださいARN。

    { "Version": "2012-10-17", "Statement": [ { "Effect": "Allow", "Principal": { "AWS": "arn:aws:iam::{pipeline-account-id}:role/pipeline-role" }, "Action": [ "es:DescribeDomain", "es:ESHttp*" ], "Resource": [ "arn:aws:es:{region}:{account-id}:domain/domain-name" ] } ] }

    コレクションまたはドメインへの書き込みデータにアクセスするための正しいアクセス許可を持つ IAMロールを作成するには、「ドメインに必要なアクセス許可」および「コレクションに必要なアクセス許可」を参照してください。

ステップ 1: パイプラインロールを設定する

パイプラインの前提条件を設定したら、パイプライン設定で使用するパイプラインロールを設定し、ロールに次のアクセス許可を追加します。

{ "Version": "2012-10-17", "Statement": [ { "Sid": "SecretsManagerReadAccess", "Effect": "Allow", "Action": [ "secretsmanager:GetSecretValue" ], "Resource": ["arn:aws:secretsmanager:{region}:{account-id}:secret:secret-name"] }, { "Effect": "Allow", "Action": [ "ec2:AttachNetworkInterface", "ec2:CreateNetworkInterface", "ec2:CreateNetworkInterfacePermission", "ec2:DeleteNetworkInterface", "ec2:DeleteNetworkInterfacePermission", "ec2:DetachNetworkInterface", "ec2:DescribeNetworkInterfaces" ], "Resource": [ "arn:aws:ec2:*:{account-id}:network-interface/*", "arn:aws:ec2:*:{account-id}:subnet/*", "arn:aws:ec2:*:{account-id}:security-group/*" ] }, { "Effect": "Allow", "Action": [ "ec2:DescribeDhcpOptions", "ec2:DescribeRouteTables", "ec2:DescribeSecurityGroups", "ec2:DescribeSubnets", "ec2:DescribeVpcs", "ec2:Describe*" ], "Resource": "*" }, { "Effect": "Allow", "Action": [ "ec2:CreateTags" ], "Resource": "arn:aws:ec2:*:*:network-interface/*", "Condition": { "StringEquals": { "aws:RequestTag/OSISManaged": "true" } } } ] }

パイプラインはこれらのEC2アクセス許可を使用して OpenSearch でネットワークインターフェイスを作成および削除するため、取り込みパイプラインの作成に使用するIAMロールに対して上記の Amazon アクセス許可を指定する必要がありますVPC。パイプラインは、このネットワークインターフェイスを介してのみ Kafka クラスターにアクセスできます。

ステップ 2: パイプラインを作成する

その後、Kafka OpenSearch をソースとして指定する Ingestion パイプラインを次のように設定できます。

複数の OpenSearch サービスドメインをデータの送信先として指定できます。この機能により、受信データを複数の OpenSearch サービスドメインに条件付きでルーティングまたはレプリケーションできます。

ソース Confluent Kafka クラスターから OpenSearch サーバーレスVPCコレクションにデータを移行することもできます。パイプライン設定内でネットワークアクセスポリシーを指定していることを確認します。Confluent スキーマレジストリを使用して、Confluent スキーマを定義できます。

version: "2" kafka-pipeline: source: kafka: encryption: type: "ssl" topics: - name: "topic-name" group_id: "group-id" bootstrap_servers: - "bootstrap-server.us-west-2.aws.private.confluent.cloud:9092" authentication: sasl: plain: username: ${aws_secrets:confluent-kafka-secret:username} password: ${aws_secrets:confluent-kafka-secret:password} schema: type: confluent registry_url: https://my-registry.us-west-2.aws.confluent.cloud api_key: "${{aws_secrets:schema-secret:schema_registry_api_key}}" api_secret: "${{aws_secrets:schema-secret:schema_registry_api_secret}}" basic_auth_credentials_source: "USER_INFO" sink: - opensearch: hosts: ["https://search-mydomain.us-west-2.es.amazonaws.com"] aws: sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" region: "us-west-2" index: "confluent-index" extension: aws: secrets: confluent-kafka-secret: secret_id: "my-kafka-secret" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role" schema-secret: secret_id: "my-self-managed-kafka-schema" region: "us-west-2" sts_role_arn: "arn:aws:iam::{account-id}:role/pipeline-role"

事前設定された設計図を使用して、このパイプラインを作成できます。詳細については、「ブループリントを使用したパイプラインの作成」を参照してください。