Apache Spark のデータソースコネクタを操作する - Amazon Athena

Apache Spark のデータソースコネクタを操作する

一部の Athena データソースコネクタは、Spark DSV2 コネクタとして利用できます。Spark DSV2 コネクタ名には -dsv2 サフィックスが付いています (例: athena-dynamodb-dsv2)。

現在利用可能な DSV2 コネクタ、その Spark .format() クラス名、および対応する Amazon Athena Federated Query ドキュメントへのリンクを次に示します。

DSV2 コネクタ Spark .format() クラス名 ドキュメント
athena-cloudwatch-dsv2 com.amazonaws.athena.connectors.dsv2.cloudwatch.CloudwatchTableProvider CloudWatch
athena-cloudwatch-metrics-dsv2 com.amazonaws.athena.connectors.dsv2.cloudwatch.metrics.CloudwatchMetricsTableProvider CloudWatch メトリクス
athena-aws-cmdb-dsv2 com.amazonaws.athena.connectors.dsv2.aws.cmdb.AwsCmdbTableProvider CMDB
athena-dynamodb-dsv2 com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider DynamoDB

DSV2 コネクタ用の .jar ファイルをダウンロードするには、Amazon Athena Query Federation DSV2 の GitHub ページにアクセスし、「リリース」、「<バージョン> のリリース」、「アセット」セクションを参照してください。

Spark に対する jar を指定する

Spark で Athena DSV2 コネクタを使用するには、使用している Spark 環境にコネクタの .jar ファイルを送信します。次のセクションでは、特定のケースについて説明します。

Athena for Spark

Amazon Athena for Apache Spark にカスタム .jar ファイルとカスタム設定を追加するための情報については、「Spark プロパティを使用してカスタム設定を指定する」を参照してください。

一般的な Spark

コネクタ .jar ファイルを Spark に渡すには、次の例のように、spark-submit コマンドを使用し、--jars オプションで .jar ファイルを指定します。

spark-submit \ --deploy-mode cluster \ --jars https://github.com/awslabs/aws-athena-query-federation-dsv2/releases/download/some_version/athena-dynamodb-dsv2-some_version.jar

Amazon EMR Spark

Amazon EMR で --jars パラメータを指定して spark-submit コマンドを実行するには、Amazon EMR Spark クラスターにステップを追加する必要があります。Amazon EMR での spark-submit の使用方法の詳細については、「Amazon EMR リリースガイド」の「Spark ステップを追加する」を参照してください。

AWS Glue ETL Spark

AWS Glue ETL の場合、.jar ファイルの GitHub.com URL を aws glue start-job-run コマンドの --extra-jars 引数に渡すことができます。AWS Glue ドキュメントでは --extra-jars パラメータが Amazon S3 パスを取るように説明されていますが、パラメータは HTTPS URL を取ることもできます。詳細については、「AWS Glue デベロッパーガイド」の「ジョブパラメータリファレンス」を参照してください。

Spark でのコネクタをクエリする

Apache Spark で既存の Athena フェデレーテッドクエリと同等のクエリを送信するには、spark.sql() 関数を使用します。たとえば、Apache Spark で使用する次の Athena クエリがあるとします。

SELECT somecola, somecolb, somecolc FROM ddb_datasource.some_schema_or_glue_database.some_ddb_or_glue_table WHERE somecola > 1

Amazon Athena DynamoDB DSV2 コネクタを使用して Spark で同じクエリを実行するには、次のコードを使用します。

dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load()) dynamoDf.createOrReplaceTempView("ddb_spark_table") spark.sql(''' SELECT somecola, somecolb, somecolc FROM ddb_spark_table WHERE somecola > 1 ''')

パラメータを指定します

DSV2 バージョンの Athena データソースコネクタは、対応する Athena データソースコネクタと同じパラメータを使用します。パラメータの情報については、対応する Athena データソースコネクタのドキュメントを参照してください。

PySpark コードでは、次の構文を使用してパラメータを設定します。

spark.read.option("athena.connectors.conf.parameter", "value")

たとえば、次のコードは Amazon Athena DynamoDB コネクタ disable_projection_and_casing パラメータを always に設定します。

dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .option("athena.connectors.conf.disable_projection_and_casing", "always") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load())