Apache Spark のデータソースコネクタを操作する
一部の Athena データソースコネクタは、Spark DSV2 コネクタとして利用できます。Spark DSV2 コネクタ名には -dsv2
サフィックスが付いています (例: athena-dynamodb-dsv2
)。
現在利用可能な DSV2 コネクタ、その Spark .format()
クラス名、および対応する Amazon Athena Federated Query ドキュメントへのリンクを次に示します。
DSV2 コネクタ | Spark .format() クラス名 | ドキュメント |
---|---|---|
athena-cloudwatch-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.CloudwatchTableProvider |
CloudWatch |
athena-cloudwatch-metrics-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.metrics.CloudwatchMetricsTableProvider |
CloudWatch メトリクス |
athena-aws-cmdb-dsv2 | com.amazonaws.athena.connectors.dsv2.aws.cmdb.AwsCmdbTableProvider |
CMDB |
athena-dynamodb-dsv2 | com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider |
DynamoDB |
DSV2 コネクタ用の .jar
ファイルをダウンロードするには、Amazon Athena Query Federation DSV2<バージョン>
のリリース」、「アセット」セクションを参照してください。
Spark に対する jar を指定する
Spark で Athena DSV2 コネクタを使用するには、使用している Spark 環境にコネクタの .jar
ファイルを送信します。次のセクションでは、特定のケースについて説明します。
Athena for Spark
Amazon Athena for Apache Spark にカスタム .jar
ファイルとカスタム設定を追加するための情報については、「Spark プロパティを使用してカスタム設定を指定する」を参照してください。
一般的な Spark
コネクタ .jar
ファイルを Spark に渡すには、次の例のように、spark-submit
コマンドを使用し、--jars
オプションで .jar
ファイルを指定します。
spark-submit \ --deploy-mode cluster \ --jars https://github.com/awslabs/aws-athena-query-federation-dsv2/releases/download/
some_version
/athena-dynamodb-dsv2-some_version
.jar
Amazon EMR Spark
Amazon EMR で --jars
パラメータを指定して spark-submit
コマンドを実行するには、Amazon EMR Spark クラスターにステップを追加する必要があります。Amazon EMR での spark-submit
の使用方法の詳細については、「Amazon EMR リリースガイド」の「Spark ステップを追加する」を参照してください。
AWS Glue ETL Spark
AWS Glue ETL の場合、.jar
ファイルの GitHub.com URL を aws glue start-job-run
コマンドの --extra-jars
引数に渡すことができます。AWS Glue ドキュメントでは --extra-jars
パラメータが Amazon S3 パスを取るように説明されていますが、パラメータは HTTPS URL を取ることもできます。詳細については、「AWS Glue デベロッパーガイド」の「ジョブパラメータリファレンス」を参照してください。
Spark でのコネクタをクエリする
Apache Spark で既存の Athena フェデレーテッドクエリと同等のクエリを送信するには、spark.sql()
関数を使用します。たとえば、Apache Spark で使用する次の Athena クエリがあるとします。
SELECT somecola, somecolb, somecolc FROM ddb_datasource.some_schema_or_glue_database.some_ddb_or_glue_table WHERE somecola > 1
Amazon Athena DynamoDB DSV2 コネクタを使用して Spark で同じクエリを実行するには、次のコードを使用します。
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load()) dynamoDf.createOrReplaceTempView("ddb_spark_table") spark.sql(''' SELECT somecola, somecolb, somecolc FROM ddb_spark_table WHERE somecola > 1 ''')
パラメータを指定します
DSV2 バージョンの Athena データソースコネクタは、対応する Athena データソースコネクタと同じパラメータを使用します。パラメータの情報については、対応する Athena データソースコネクタのドキュメントを参照してください。
PySpark コードでは、次の構文を使用してパラメータを設定します。
spark.read.option("athena.connectors.conf.
parameter
", "value
")
たとえば、次のコードは Amazon Athena DynamoDB コネクタ disable_projection_and_casing
パラメータを always
に設定します。
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .option("athena.connectors.conf.disable_projection_and_casing", "always") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load())