Amazon Athena の Apache Kafka コネクタ - Amazon Athena

Amazon Athena の Apache Kafka コネクタ

Apache Kafka 用の Amazon Athena コネクタを使用して、Amazon Athena で Apache Kafka トピックに対して SQL クエリを実行できます。このコネクタを使用すると、Apache Kafka トピックをテーブルとして、メッセージを Athena の行として表示できます。

このコネクタは、Glue 接続を使用して Glue の設定プロパティを一元化しません。接続設定は Lambda を介して行われます。

前提条件

Athena コンソールまたは AWS Serverless Application Repository を使用して AWS アカウント にコネクタをデプロイします。詳細については、「データソース接続を作成する」または「AWS Serverless Application Repository を使用してデータソースコネクタをデプロイする」を参照してください。

制限

  • DDL の書き込みオペレーションはサポートされていません。

  • 関連性のある Lambda 上限値。詳細については、AWS Lambda デベロッパーガイドLambda のクォータを参照してください。

  • フィルター条件における日付とタイムスタンプのデータ型は、適切なデータ型にキャストする必要があります。

  • 日付とタイムスタンプのデータ型は、CSV ファイルタイプではサポートされていないため、varchar 値として扱われます。

  • ネストされた JSON フィールドへのマッピングはサポートされていません。コネクタは最上位のフィールドのみをマッピングします。

  • コネクタは複合型をサポートしていません。複合型は文字列として解釈されます。

  • 複合型の JSON 値を抽出または処理するには、Athena に用意されている JSON 関連の関数を使用してください。詳細については、「文字列から JSON データを抽出する」を参照してください。

  • コネクタは、Kafka メッセージメタデータへのアクセスをサポートしていません。

用語

  • メタデータハンドラー – データベースインスタンスからメタデータを取得する Lambda ハンドラー。

  • レコードハンドラー – データベースインスタンスからデータレコードを取得する Lambda ハンドラー。

  • 複合ハンドラー — データベースインスタンスからメタデータとデータレコードの両方を取得する Lambda ハンドラー。

  • Kafka エンドポイント - Kafka インスタンスへの接続を確立するテキスト文字列。

クラスター間の互換性

Kafka コネクタは、次のクラスタータイプで使用できます。

  • スタンドアロン Kafka - Kafka への直接の接続 (認証または非認証)。

  • Confluent — Confluent Kafka への直接接続。Confluent Kafka データに Athena を使用する方法については、「AWS Business Intelligence Blog」の「Visualize Confluent data in Amazon QuickSight using Amazon Athena」を参照してください。

Confluent への接続

Confluent への接続には以下の手順が必要です。

  1. Confluent から API キーを生成する。

  2. Confluent API キーのユーザー名とパスワードを入力する。AWS Secrets Manager

  3. Kafka コネクターの secrets_manager_secret 環境変数にシークレット名を指定します。

  4. このドキュメントの Kafka コネクタのセットアップ セクションの手順に従ってください。

サポートされた認証方法

コネクタでは、次の認証方法がサポートされています。

  • SSL

  • SASL/SCRAM

  • SASL/PLAIN

  • SASL/PLAINTEXT

  • NO_AUTH

  • セルフマネージド型の Kafka と Confluent Platform — SSL、SASL/SCRAM、SASL/プレーンテキスト、NO_AUTH

  • セルフマネージド型の Kafka および Confluent Cloud — SASL/PLAIN

詳細については、「Athena Kafka コネクタの認証を設定」を参照してください。

対応する入力データ形式

コネクタは、次の入力データ形式をサポートします。

  • JSON

  • CSV

  • AVRO

  • PROTOBUF (PROTOCOL BUFFERS)

パラメータ

このセクションのパラメータを使用して Athena Kafka コネクタを設定します。

  • auth_type - クラスターの認証タイプを指定します。コネクタは、次のタイプの認証をサポートしています。

    • NO_AUTH - Kafka に直接接続します (たとえば、認証を使用しない EC2 インスタンスにデプロイされた Kafka クラスターなど)。

    • SASL_SSL_PLAIN - このメソッドは、SASL_SSL セキュリティプロトコルと PLAIN SASL メカニズムを使用します。詳細については、Apache Kafka ドキュメントの「SASL 設定」を参照してください。

    • SASL_PLAINTEXT_PLAIN - このメソッドは、SASL_PLAINTEXT セキュリティプロトコルと PLAIN SASL メカニズムを使用します。詳細については、Apache Kafka ドキュメントの「SASL 設定」を参照してください。

    • SASL_SSL_SCRAM_SHA512 - この認証タイプを使用して Apache Kafka クラスターへのアクセスを制御できます。このメソッドでは、ユーザー名とパスワードを AWS Secrets Manager に保存します。シークレットは Kafka クラスターに関連付けられている必要があります。詳細については、Apache Kafka ドキュメントの「SASL/SCRAM を使用する認証」を参照してください。

    • SASL_PLAINTEXT_SCRAM_SHA512 - このメソッドは、SASL_PLAINTEXT セキュリティプロトコルと SCRAM_SHA512 SASL メカニズムを使用します。このメソッドでは、AWS Secrets Manager に保存したユーザー名とパスワードを使用します。詳細については、Apache Kafka ドキュメントの「SASL 設定」セクションを参照してください。

    • SSL - SSL 認証では、キーストアとトラストストアのファイルを使用して Apache Kafka クラスターに接続します。トラストストアファイルとキーストアファイルを生成し、それらを Amazon S3 バケットにアップロードして、コネクタをデプロイするときに Amazon S3 への参照を提供する必要があります。キーストア、トラストストア、および SSL キーは AWS Secrets Manager に保存されます。コネクタをデプロイする際に、クライアントは AWS のシークレットキーを提供する必要があります。詳細については、Apache Kafka ドキュメントの「SSL を使用した暗号化と認証」を参照してください。

      詳細については、「Athena Kafka コネクタの認証を設定」を参照してください。

  • certificates_s3_reference - 証明書 (キーストアとトラストストアのファイル) を含む Amazon S3 の場所。

  • disable_spill_encryption – (オプション) True に設定されている場合、スピルに対する暗号化を無効にします。デフォルト値は False です。この場合、S3 にスピルされたデータは、AES-GCM を使用して (ランダムに生成されたキー、または KMS により生成したキーにより) 暗号化されます。スピル暗号化を無効にすると、特にスピルされる先でサーバー側の暗号化を使用している場合に、パフォーマンスが向上します。

  • kafka_endpoint - Kafka に提供するエンドポイントの詳細。

  • schema_registry_url – スキーマレジストリの URL アドレス (例: http://schema-registry.example.org:8081)。AVRO および PROTOBUF データ形式に適用されます。

  • secrets_manager_secret - 認証情報が保存されている AWS シークレットの名前。

  • スピルパラメータ - Lambda 関数は、メモリに収まらないデータを Amazon S3 に一時的に保存 (「スピル」) します。同一の Lambda 関数によってアクセスされるすべてのデータベースインスタンスは、同じ場所にスピルします。次の表のパラメータを使用して、スピル場所を指定します。

    パラメータ 説明
    spill_bucket 必須。Lambda 関数がデータをスピルできる Amazon S3 バケットの名前。
    spill_prefix 必須。Lambda 関数がデータをスピルさせることができるスピルバケット内のプリフィックス。
    spill_put_request_headers (オプション) スピルに使用される Amazon S3 の putObject リクエスト (例:{"x-amz-server-side-encryption" : "AES256"}) における、リクエストヘッダーと値に関する JSON でエンコードされたマッピング。利用可能な他のヘッダーについては、「Amazon Simple Storage Service API リファレンス」の「PutObject」を参照してください。
  • サブネット ID — Lambda 関数がデータソースへのアクセスに使用できるサブネットに対応する 1 つ以上のサブネット ID。

    • パブリックな Kafka クラスターまたは標準の Confluent Cloud クラスタ — コネクターを NAT ゲートウェイのあるプライベートサブネットに関連付けます。

    • プライベート接続の Confluent Cloud クラスタ — Confluent Cloud クラスタへのルートがあるプライベートサブネットにコネクタを関連付けます。

      • AWS Transit Gateway の場合、サブネットは Confluent Cloud が使用するのと同じトランジットゲートウェイに接続されている VPC 内にある必要があります。

      • VPC ピアリングの場合、サブネットは Confluent Cloud VPC にピアリングされている VPC 内にある必要があります。

      • AWS PrivateLinkの場合、サブネットは Confluent Cloud に接続する VPC エンドポイントへのルートがある VPC 内にある必要があります。

注記

プライベートリソースにアクセスするためにコネクタを VPC にデプロイし、Confluent などのパブリックにアクセス可能なサービスにも接続する場合は、NAT ゲートウェイを持つプライベートサブネットにコネクタを関連付ける必要があります。詳細については、Amazon VPC ユーザーガイドの「NAT ゲートウェイ」を参照してください。

サポートされるデータ型

次の表に、Kafka と Apache Arrow に対応するデータ型を示します。

Kafka Arrow
CHAR VARCHAR
VARCHAR VARCHAR
TIMESTAMP ミリ秒
DATE DAY
BOOLEAN BOOL
SMALLINT SMALLINT
INTEGER INT
BIGINT BIGINT
DECIMAL FLOAT8
DOUBLE FLOAT8

パーティションと分割

Kafka トピックはパーティションに分割されています。各パーティションは順序付けられています。パーティション内の各メッセージには、オフセットと呼ばれるインクリメンタル ID があります。各 Kafka パーティションは、並列処理のためにさらに複数のスプリットに分割されます。データは、Kafka クラスターに設定された保持期間中、利用できます。

ベストプラクティス

ベストプラクティスとして、次の例のように、Athena にクエリを実行するときは、述語プッシュダウンを使用してください。

SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE integercol = 2147483647
SELECT * FROM "kafka_catalog_name"."glue_schema_registry_name"."glue_schema_name" WHERE timestampcol >= TIMESTAMP '2018-03-25 07:30:58.878'

Kafka コネクタのセットアップ

コネクタを使用する前に、Apache Kafka クラスターをセットアップし、AWS Glue スキーマレジストリを使用してスキーマを定義し、コネクタの認証を設定する必要があります。

AWS Glue スキーマレジストリを使用する際は、次の点に注意してください。

  • AWS Glue スキーマレジストリの [Description] (説明) フィールドのテキストに文字列 {AthenaFederationKafka} が含まれていることを確認してください。このマーカー文字列は、Amazon Athena Kafka コネクタで使用する AWS Glue レジストリに必要です。

  • 最高のパフォーマンスを得るには、 データベース名とテーブル名には小文字のみを使用してください。大文字と小文字が混在すると、コネクタは大文字と小文字を区別しない検索を実行するため、計算量が多くなります。

Apache Kafka 環境と AWS Glue スキーマレジストリをセットアップするには
  1. Apache Kafka 環境をセットアップします。

  2. JSON 形式の Kafka トピック記述ファイル (つまり、そのスキーマ) を AWS Glue スキーマレジストリにアップロードします。詳細については、AWS Glue デベロッパーガイドの「AWS Glue スキーマ登録と連携する」を参照してください。

  3. AWS Glue スキーマレジストリでスキーマを定義する際に AVRO または PROTOBUF データ形式を使用するには:

    • [スキーマ名] で、元と同じように大文字および小文字を使用して Kafka トピック名を入力します。

    • [データ形式] で、[Apache Avro] または [プロトコルバッファ] を選択します。

    スキーマの例については、次のセクションを参照してください。

スキーマを AWS Glue スキーマレジストリにアップロードする際は、このセクションの例の形式を使用してください。

JSON タイプのスキーマの例

次の例では、AWS Glue スキーマレジストリに作成されるスキーマは、dataFormat の値として json を指定し、topicName の値として datatypejson を使用します。

注記

topicName の値は、Kafka のトピック名と同じ大文字と小文字を使用する必要があります。

{ "topicName": "datatypejson", "message": { "dataFormat": "json", "fields": [ { "name": "intcol", "mapping": "intcol", "type": "INTEGER" }, { "name": "varcharcol", "mapping": "varcharcol", "type": "VARCHAR" }, { "name": "booleancol", "mapping": "booleancol", "type": "BOOLEAN" }, { "name": "bigintcol", "mapping": "bigintcol", "type": "BIGINT" }, { "name": "doublecol", "mapping": "doublecol", "type": "DOUBLE" }, { "name": "smallintcol", "mapping": "smallintcol", "type": "SMALLINT" }, { "name": "tinyintcol", "mapping": "tinyintcol", "type": "TINYINT" }, { "name": "datecol", "mapping": "datecol", "type": "DATE", "formatHint": "yyyy-MM-dd" }, { "name": "timestampcol", "mapping": "timestampcol", "type": "TIMESTAMP", "formatHint": "yyyy-MM-dd HH:mm:ss.SSS" } ] } }

CSV タイプスキーマの例

次の例では、AWS Glue スキーマレジストリに作成されるスキーマは、dataFormat の値として csv を指定し、topicName の値として datatypecsvbulk を使用します。topicName の値は、Kafka のトピック名と同じ大文字と小文字を使用する必要があります。

{ "topicName": "datatypecsvbulk", "message": { "dataFormat": "csv", "fields": [ { "name": "intcol", "type": "INTEGER", "mapping": "0" }, { "name": "varcharcol", "type": "VARCHAR", "mapping": "1" }, { "name": "booleancol", "type": "BOOLEAN", "mapping": "2" }, { "name": "bigintcol", "type": "BIGINT", "mapping": "3" }, { "name": "doublecol", "type": "DOUBLE", "mapping": "4" }, { "name": "smallintcol", "type": "SMALLINT", "mapping": "5" }, { "name": "tinyintcol", "type": "TINYINT", "mapping": "6" }, { "name": "floatcol", "type": "DOUBLE", "mapping": "7" } ] } }

AVRO 型スキーマの例

次の例は、AWS Glue スキーマレジストリで AVRO ベースのスキーマを作成するために使用されます。AWS Glue スキーマレジストリでスキーマを定義する場合、[スキーマ名] で元と同じように大文字および小文字を使用して Kafka トピック名を入力し、[データ形式][Apache Avro] を選択します。この情報はレジストリで直接指定するため、dataformat および topicName フィールドは必要ありません。

{ "type": "record", "name": "avrotest", "namespace": "example.com", "fields": [{ "name": "id", "type": "int" }, { "name": "name", "type": "string" } ] }

PROTOBUF タイプのスキーマの例

次の例は、AWS Glue スキーマレジストリで PROTOBUF ベースのスキーマを作成するために使用されます。AWS Glue スキーマレジストリでスキーマを定義する場合、[スキーマ名] で元と同じように大文字および小文字を使用して Kafka トピック名を入力し、[データ形式][Protocol Buffers] を選択します。この情報はレジストリで直接指定するため、dataformat および topicName フィールドは必要ありません。1 行目はスキーマを PROTOBUF として定義します。

syntax = "proto3"; message protobuftest { string name = 1; int64 calories = 2; string colour = 3; }

AWS Glue スキーマレジストリにレジストリとスキーマを追加する方法の詳細については、AWS Glue ドキュメントの「スキーマレジストリの開始方法」を参照してください。

Athena Kafka コネクタの認証を設定

SSL、SASL/SCRAM、SASL/PLAIN、SASL/PLAINTEXT など、Apache Kafka クラスターへの認証にはさまざまなメソッドを使用できます。

次の表は、コネクタの認証タイプと、それぞれのセキュリティプロトコルおよび SASL メカニズムを示しています。詳細については、Apache Kafka ドキュメントの「セキュリティ」セクションを参照してください。

auth_type security.protocol sasl.mechanism クラスタータイプの互換性
SASL_SSL_PLAIN SASL_SSL PLAIN
  • セルフマネージド型の Kafka

  • Confluent Platform

  • Confluent Cloud

SASL_PLAINTEXT_PLAIN SASL_PLAINTEXT PLAIN
  • セルフマネージド型の Kafka

  • Confluent Platform

SASL_SSL_SCRAM_SHA512 SASL_SSL SCRAM-SHA-512
  • セルフマネージド型の Kafka

  • Confluent Platform

SASL_PLAINTEXT_SCRAM_SHA512 SASL_PLAINTEXT SCRAM-SHA-512
  • セルフマネージド型の Kafka

  • Confluent Platform

SSL SSL 該当なし
  • セルフマネージド型の Kafka

  • Confluent Platform

SSL

クラスターが SSL 認証されている場合、トラストストアとキーストアファイルを生成して Amazon S3 バケットにアップロードする必要があります。コネクタをデプロイする際に、この Amazon S3 リファレンスを提供する必要があります。キーストア、トラストストア、および SSL キーは AWS Secrets Manager に保存されます。コネクタをデプロイする際に、AWS のシークレットキーを提供します。

Secrets Manager でのシークレットの作成については、「AWS Secrets Manager シークレットを作成する」を参照してください。

この認証タイプを使用するには、以下の表の説明どおりに環境変数を設定します。

パラメータ
auth_type SSL
certificates_s3_reference 証明書を格納する Amazon S3 の場所。
secrets_manager_secret AWS シークレットキーの名前。

Secrets Manager でシークレットを作成した後、Secrets Manager コンソールでそのシークレットを確認できます。

Secrets Manager で自分のシークレットを確認するには
  1. Secrets Manager のコンソール (https://console.aws.amazon.com/secretsmanager/) を開きます。

  2. ナビゲーションペインで [Secrets] (シークレット) を選択します。

  3. [Secrets] (シークレット) ページで、自分のシークレットを選択します。

  4. 自分のシークレットの詳細ページで、[Retrieve secret value] (シークレットの値を取得する) を選択します。

    次の画像は、3 組のキー/値のペア (keystore_passwordtruststore_passwordssl_key_password) を持つシークレットの例を示しています。

    Secrets Manager で SSL シークレットを取得する

Kafka で SSL を使用する際の詳細については、Apache Kafka ドキュメントの「SSL を使用した暗号化と認証」を参照してください。

SASL/SCRAM

クラスターが SCRAM 認証を使用している場合は、コネクタをデプロイするときに、クラスターに関連付けられている Secrets Manager キーを指定します。ユーザーの AWS 認証情報 (シークレットキーとアクセスキー) は、クラスターとの認証に使用されます。

下表のように環境変数を設定します。

パラメータ
auth_type SASL_SSL_SCRAM_SHA512
secrets_manager_secret AWS シークレットキーの名前。

次の画像は、Secrets Manager コンソールにある 2 つのキー/値のペア (1 つは username 用、もう 1 つは password 用) のシークレットの例を示しています。

Secrets Manager での SCRAM シークレットの取得

Kafka で SASL/SCRAM を使用する際の詳細については、Apache Kafka ドキュメントの「SASL/SCRAM 認証の使用」を参照してください。

ライセンス情報

このコネクタを使用することにより、pom.xml ファイル内のリストにある、サードパーティのコンポーネントが使用されることを承認し、 GitHub.com にある LICENSE.txt ファイルに記載されている、個別のサードパーティライセンスの使用条件に同意したとみなされます。

追加リソース

このコネクタに関するその他の情報については、GitHub.com で対応するサイトを参照してください。