使用适用于 Apache Spark 的数据来源连接器
某些 Athena 数据来源连接器可用作 Spark DSV2 连接器。Spark DSV2 连接器名称带有 -dsv2
后缀(例如 athena-dynamodb-dsv2
)。
下面是当前可用的 DSV2 连接器、其 Spark .format()
类名称以及指向其相应的 Amazon Athena 联合查询文档的链接:
DSV2 连接器 | Spark .format() 类名称 | 文档 |
---|---|---|
athena-cloudwatch-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.CloudwatchTableProvider |
CloudWatch |
athena-cloudwatch-metrics-dsv2 | com.amazonaws.athena.connectors.dsv2.cloudwatch.metrics.CloudwatchMetricsTableProvider |
CloudWatch 指标 |
athena-aws-cmdb-dsv2 | com.amazonaws.athena.connectors.dsv2.aws.cmdb.AwsCmdbTableProvider |
CMDB |
athena-dynamodb-dsv2 | com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider |
DynamoDB |
要下载 DSV2 连接器的 .jar
文件,请访问 Amazon Athena Query Federation DSV2 <version>
、资产部分。
为 Spark 指定 jar
要将 Athena DSV2 连接器与 Spark 配合使用,将连接器的 .jar
文件提交到您正在使用的 Spark 环境。以下几节介绍了具体的情况。
Athena for Spark
有关向 Amazon Athena for Apache Spark 添加自定义 .jar
文件和自定义配置的信息,请参阅 使用 Spark 属性指定自定义配置。
常规 Spark
要将连接器 .jar
文件传递给 Spark,使用 spark-submit
命令并在 --jars
选项中指定 .jar
文件,如以下示例所示:
spark-submit \ --deploy-mode cluster \ --jars https://github.com/awslabs/aws-athena-query-federation-dsv2/releases/download/
some_version
/athena-dynamodb-dsv2-some_version
.jar
Amazon EMR Spark
要在 Amazon EMR 上运行包含 --jars
参数的 spark-submit
命令,必须向 Amazon EMR Spark 集群添加一个步骤。有关如何在 Amazon EMR 上使用 spark-submit
的详细信息,请参阅《Amazon EMR 版本指南》中的添加 Spark 步骤。
AWS Glue ETL Spark
对于 AWS Glue ETL,可以将 .jar
文件的 GitHub.com URL 传递到 aws glue start-job-run
命令的 --extra-jars
参数。AWS Glue 文档将 --extra-jars
参数描述为采用 Amazon S3 路径,但该参数也可以采用 HTTPS URL。有关更多信息,请参阅《AWS Glue 开发人员指南》中的作业参数参考。
在 Spark 上查询连接器
要在 Apache Spark 上提交现有 Athena 联合查询的等效项,使用 spark.sql()
函数。例如,假设您拥有以下要在 Apache Spark 上使用的 Athena 查询。
SELECT somecola, somecolb, somecolc FROM ddb_datasource.some_schema_or_glue_database.some_ddb_or_glue_table WHERE somecola > 1
要使用 Amazon Athena DynamoDB DSV2 连接器在 Spark 上执行相同的查询,使用以下代码:
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load()) dynamoDf.createOrReplaceTempView("ddb_spark_table") spark.sql(''' SELECT somecola, somecolb, somecolc FROM ddb_spark_table WHERE somecola > 1 ''')
指定参数
Athena 数据来源连接器的 DSV2 版本使用与相应的 Athena 数据来源连接器相同的参数。有关参数信息,请参阅相应的 Athena 数据来源连接器的文档。
在 Pyspark 代码中,使用以下语法配置参数。
spark.read.option("athena.connectors.conf.
parameter
", "value
")
例如,以下代码将 Amazon Athena DynamoDB 连接器 disable_projection_and_casing
参数设置为 always
。
dynamoDf = (spark.read .option("athena.connectors.schema", "some_schema_or_glue_database") .option("athena.connectors.table", "some_ddb_or_glue_table") .option("athena.connectors.conf.disable_projection_and_casing", "always") .format("com.amazonaws.athena.connectors.dsv2.dynamodb.DDBTableProvider") .load())