AWS GlueScala API GlueContext - AWS Glue

本文属于机器翻译版本。若本译文内容与英语原文存在差异,则一律以英文原文为准。

AWS GlueScala API GlueContext

程序包:com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext 是在以下位置读取和写入 DynamicFrame 的入口点:Amazon Simple Storage Service(Amazon S3)、 AWS Glue 数据目录、JDBC 等。此类提供实用程序函数,用于创建可用来读取和写入 DynamicFrame数据源特性DataSink 对象。

如果从源创建的分区数小于分区的最小阈值(默认值为 10),您还可以使用 GlueContext 设置 DynamicFrame 中的目标分区数量(默认值为 20)。

def 列 addIngestionTime

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

将提取时间列(例如 ingest_yearingest_monthingest_dayingest_houringest_minute)附加到输入 DataFrame。当您指定以 Amazon S3 为目标的数据目录表时,AWS Glue 生成的脚本中会自动生成此函数。此函数使用输出表上的提取时间列自动更新分区。这允许根据提取时间自动对输出数据进行分区,而不需要在输入数据中显示提取时间列。

  • dataFrame – 提取时间列要附加到的 dataFrame

  • timeGranularity – 时间列的粒度。有效值为“day”、“hour”和“minute”。例如,如果“hour”传递到函数,原始 dataFrame 将附加“ingest_year”、“ingest_month”、“ingest_day”和“ingest_hour”时间列。

在附加时间粒度列后返回数据框。

例如:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

返回一个使用指定连接和格式创建的 DataFrame。此功能只能用于 AWS Glue 直播源。

  • connectionType – 流式传输连接类型。有效值包括 kinesiskafka

  • connectionOptions – 连接选项,不同于 Kinesis 和 Kafka。您可以在 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 找到每个流式传输数据源的所有连接选项列表。请注意流式传输连接选项的以下差异:

    • Kinesis 流式传输源需要 streamARNstartingPositioninferSchemaclassification

    • Kinesis 流式传输源需要 connectionNametopicNamestartingOffsetsinferSchemaclassification

  • transformationContext – 要使用的转换上下文(可选)。

  • format – 格式规范(可选)。这用于 Amazon S3 或支持多种格式的 AWS Glue 连接。有关所支持格式的信息,请参阅 AWS Glue for Spark 中的输入和输出的数据格式选项

  • formatOptions – 指定格式的格式选项。有关所支持的格式选项的信息,请参阅数据格式选项

Amazon Kinesis 流式传输源示例:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Kafka 流式传输源示例:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

将传入的 batch_function 应用于从流式传输源读取的每个微批处理。

  • frame— DataFrame 包含当前微批次的。

  • batch_function – 应用于每个微处理的函数。

  • options – 键值对集合,其中包含有关如何处理微批处理的信息。以下选项为必填:

    • windowSize – 处理每个批处理所花费的时间量。

    • checkpointLocation – 为流式传输 ETL 任务存储检查点的位置。

    • batchMaxRetries – 在该批处理失败时重试的最大次数。默认值为 3。此选项仅适用于 Glue 版本 2.0 及更高版本。

示例

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

创建一个可向在数据目录中定义的表中指定的位置写入的 DataSink

  • database – 数据目录中的数据库名称。

  • tableName – 数据目录中的表名称。

  • redshiftTmpDir – 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。

  • transformationContext – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • additionalOptions 提供给 AWS Glue 的额外选项。

  • catalogId – 正在访问的数据目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。

返回 DataSink

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

创建可从数据目录中的表定义读取数据的 数据源特性

  • database – 数据目录中的数据库名称。

  • tableName – 数据目录中的表名称。

  • redshiftTmpDir – 用于某些数据接收器的临时暂存目录。设置为 默认情况下为空。

  • transformationContext – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • pushDownPredicate – 筛选分区,而不必列出并读取数据集中的所有文件。有关更多信息,请参阅 使用下推谓词进行预筛选

  • additionalOptions – 可选名称/值对的集合。可能选项包括 AWS Glue for Spark 中适用于 ETL 的连接类型和选项 中列出的选项,但 endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassificationdelimiter 除外。另一个支持的选项是 catalogPartitionPredicate

    catalogPartitionPredicate – 要传递目录表达式以根据索引列进行筛选。这样会将筛选下推到服务器端。有关更多信息,请参阅 AWS Glue 分区数据。请注意,push_down_predicatecatalogPartitionPredicate 使用不同的语法。前者使用 Spark SQL 标准语法,后者使用 JSQL 解析器。

  • catalogId – 正在访问的数据目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。

返回 DataSource

流式传输源示例

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

创建一个可向在数据目录中的 Connection 对象指定的 JDBC 数据库写入的 DataSinkConnection 对象具有用于连接 JDBC 接收器的信息,包括 URL、用户名、密码、VPC、子网和安全组。

  • catalogConnection – 数据目录中包含要写入的 JDBC URL 的连接名称。

  • options – JSON 名称-值对的字符串,提供写入 JDBC 数据存储所需的附加信息。这包括:

    • dbtable(必填)- JDBC 表名称。对于在数据库中支持架构的 JDBC 数据存储,指定 schema.table-name。如果未提供架构,则使用默认的“public”架构。以下示例显示一个指向名为 test 的架构的选项参数和数据库 test_db 中一个名为 test_table 的表。

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database(必填)- JDBC 数据库名称。

    • 任何其他选项直接传递到 SparkSQL JDBC 写入器。有关更多信息,请参阅 Spark 的 Redshift 数据源

  • redshiftTmpDir – 用于某些数据接收器的临时目录。设置为 默认情况下为空。

  • transformationContext – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • catalogId – 正在访问的数据目录 ID(账户 ID)。当为空时,将使用调用方的默认账户 ID。

示例代码:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

返回 DataSink

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

创建将数据写入目标的DataSink,例如亚马逊简单存储服务 (Amazon S3)、JDBC 或 Glue 数据目录,或者是 AWS Apache Kafka 或 Amazon Kinesis 数据流。

返回 DataSink

def 格式 getSinkWith

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

创建 DataSink,将数据写入 Amazon S3、JDBC、数据目录、Apache Kafka 或 Amazon Kinesis 数据流等目标写入数据。此外还将设置要写出到目标的数据格式。

返回 DataSink

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

创建从 Amazon S3、JDBC 或 Glue 数据目录等来源读取数据 AWS 的。数据源特性还支持 Kafka 和 Kinesis 流式传输数据源。

  • connectionType – 数据源的类型。请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • connectionOptions – JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。有关更多信息,请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

    Kinesis 流式传输源需要以下连接选项:streamARNstartingPositioninferSchemaclassification

    Kinesis 流式传输源需要以下连接选项:connectionNametopicNamestartingOffsetsinferSchemaclassification

  • transformationContext – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • pushDownPredicate – 预测分区列。

返回 DataSource

Amazon Kinesis 流式传输源示例:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Kafka 流式传输源示例:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

def 格式 getSourceWith

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

创建一个数据源特性,用于从 Amazon S3、JDBC 或 Glu AWS e 数据目录等来源读取数据,并设置存储在源中的数据的格式。

  • connectionType – 数据源的类型。请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • options – JSON 名称-值对的字符串,提供与数据源建立连接所需的附加信息。请参阅 AWS Glue for Spark 中适用于 ETL 的连接类型和选项

  • transformationContext – 与任务书签要使用的接收器关联的转换上下文。设置为 默认情况下为空。

  • format – 源中所存储数据的格式。当 connectionType 为“s3”时,您也可以指定 format。可以是以下值之一:“avro”、“csv”、“grokLog”、“ion”、“json”、“xml”、“parquet”或“orc”。

  • formatOptions – JSON 名称-值对的字符串,提供用于在源中分析数据的附加选项。请参阅 数据格式选项

返回 DataSource

示例

在 A DynamicFrame mazon S3 上使用逗号分隔值 (CSV) 文件的数据源创建:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

使用 JDBC 连接 DynamicFrame 从 PostgreSQL 数据源创建:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

使用 JDBC 连接 DynamicFrame 从 MySQL 的数据源创建:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

获取与此 GlueContext 关联的 SparkSession 对象。使用此 SparkSession 对象注册表和 UDF,以便在DataFrame创建时 DynamicFrames使用。

返回 SparkSession。

def startTransaction

def startTransaction(readOnly: Boolean):String

开启新事务。内部调用 Lake Formation startTransaction API。

  • readOnly –(布尔值)指示此事务应为只读还是读写。使用只读事务 ID 进行的写入将被拒绝。只读事务不需要提交。

返回事务 ID。

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

尝试提交指定的事务。可能在事务完成提交之前返回 commitTransaction。内部调用 Lake Formation commitTransaction API。

  • transactionId –(字符串)要提交的事务。

  • waitForCommit –(布尔值)确定是否立即返回 commitTransaction。默认值为 true。如果为假,则轮询 commitTransaction 并等待事务提交。等待时间限制为 1 分钟使用指数回退,最多重试 6 次。

返回布尔值,以指示是否完成提交。

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

尝试取消指定的事务。在内部调用 Lake Format CancelTransactionion API。

  • transactionId –(字符串)要取消的事务。

如果事务以前已提交,则返回 TransactionCommittedException 异常。

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

使用指定的 SparkContext、最小分区数和目标分区数创建 GlueContext 对象。

  • sc — 这些区域有: SparkContext

  • minPartitions – 最小分区数。

  • targetPartitions – 目标分区数。

返回 GlueContext

def this

def this( sc : SparkContext )

使用提供的 GlueContext 创建一个 SparkContext 对象。将最小分区数设置为 10,将目标分区数设置为 20。

  • sc — 这些区域有: SparkContext

返回 GlueContext

def this

def this( sparkContext : JavaSparkContext )

使用提供的 GlueContext 创建一个 JavaSparkContext 对象。将最小分区数设置为 10,将目标分区数设置为 20。

  • sparkContext — 这些区域有: JavaSparkContext

返回 GlueContext