AWS Glue斯卡 GlueContext 拉 - AWS Glue

本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。

AWS Glue斯卡 GlueContext 拉

Package: com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext 是讀取和寫入 DynamicFrame 至 Amazon Simple Storage Service (Amazon S3)、 AWS Glue Data Catalog 、JDBC 等的進入點。此類別提供公用程式函數來建立 DataSource 特徵DataSink 物件,從而用於讀取和寫入 DynamicFrame

如果從來源建立的分割區數低於分割區的閾值下限 (預設 10),您也可以使用 GlueContext 來設定在 DynamicFrame 中的分割區目標數 (預設 20)。

高清 addIngestionTime列

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

附加擷取時間欄 (如 ingest_yearingest_monthingest_dayingest_houringest_minute) 到輸入 DataFrame。當您指定以 Amazon S3 為目標的 Data Catalog 資料表時,此函數會在 AWS Glue 產生的指令碼中自動產生。此函數會自動使用輸出資料表上的擷取時間欄來更新分割區。這可讓輸出資料在擷取時間自動分割,而不需要輸入資料中的明確擷取時間欄。

  • dataFrame – 要將擷取時間欄附加到的 dataFrame

  • timeGranularity – 時間欄的精密程度。有效值為 "day"、"hour" 和 "minute"。例如:如果 "hour" 被傳遞給函數,原始 dataFrame 會附加上 "ingest_year"、"ingest_month"、"ingest_day" 和 "ingest_hour" 時間欄。

傳回附加時間粒度欄後的資料框架。

範例:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

高清 createDataFrame FromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

傳回使用指定的連線和格式建立的 DataFrame。此功能僅與 AWS Glue 串流來源搭配使用。

  • connectionType – 串流連線類型。有效值包括 kinesiskafka

  • connectionOptions – 連線選項,這些選項對於 Kinesis 和 Kafka 而言是不同的。您可以在 AWS Glue for Spark 中 ETL 的連線類型和選項 中找到每個串流資料來源的所有連線選項清單。請注意串流連線選項的下列不同處:

    • Kinesis 串流來源需要 streamARNstartingPositioninferSchema 以及 classification

    • Kafka 串流來源需要 connectionNametopicNamestartingOffsetsinferSchema 以及 classification

  • transformationContext – 要使用的轉換細節 (選用)。

  • format – 格式化規格 (選用)。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需有關支援格式的資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項

  • formatOptions – 指定格式的格式選項。如需支援格式選項的詳細資訊,請參閱 資料格式選項

Amazon Kinesis 串流來源範例:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Kafka 串流來源範例:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

將傳入的 batch_function 套用至從串流來源讀取的每個微批次。

  • frame— 包 DataFrame 含當前微批次。

  • batch_function – 將套用至每個微批次的函數。

  • options – 索引鍵/值配對的集合,其中包含如何處理微批次的相關資訊。下列選項是必要的:

    • windowSize – 處理每個批次的時間量。

    • checkpointLocation - 串流 ETL 任務的檢查點儲存位置。

    • batchMaxRetries – 如果失敗,可重試批次的次數上限。預設值為 3。此選項僅在 Glue 2.0 及以上版本上才可設定。

範例:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

高清 getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

建立 DataSink,以便寫入 Data Catalog 中定義之資料表中指定的位置。

  • database —  Data Catalog 中的資料庫名稱。

  • tableName —  Data Catalog 中的資料表名稱。

  • redshiftTmpDir — 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。

  • transformationContext — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。

  • additionalOptions – 提供給 AWS Glue 的額外選項。

  • catalogId — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。

傳回 DataSink

高清 getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

建立 DataSource 特徵,以便從 Data Catalog 中的資料表定義中讀取資料。

  • database —  Data Catalog 中的資料庫名稱。

  • tableName —  Data Catalog 中的資料表名稱。

  • redshiftTmpDir — 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。

  • transformationContext — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。

  • pushDownPredicate – 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需詳細資訊,請參閱 使用 pushdown 述詞預先篩選

  • additionalOptions – 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了 endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification 以及delimiter。另一個支援的選項是 catalogPartitionPredicate

    catalogPartitionPredicate — 您可以傳遞目錄表達式以根據索引欄進行篩選。這會將篩選下推至伺服器端。如需詳細資訊,請參閱 AWS Glue 分割區索引。注意 push_down_predicatecatalogPartitionPredicate 使用不同的語法。前者使用 Spark SQL 標準語法,後者使用 JSQL 剖析器。

  • catalogId — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。

傳回 DataSource

串流來源範例

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

建立 DataSink,以便寫入 Data Catalog 中 Connection 物件所指定的 JDBC 資料庫。此 Connection 物件擁有用來對 JDBC 目的地連線的資訊 (包括 URL、使用者名稱、密碼、VPC、子網路和安全群組)。

  • catalogConnection —  Data Catalog 中的連線名稱,其中包含要做為寫入目的地之 JDBC URL。

  • options — JSON 名稱值組的字串,可提供寫入 JDBC 資料存放區所需的其他資訊。其中包含:

    • dbtable (必要) — JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定 schema.table-name。如果未提供結構描述,則會使用預設的 "public" 結構描述。以下範例說明 options 參數,它會指向資料庫 test_db 中名為 test 的結構描述和名為 test_table 的資料表。

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (必要) — JDBC 資料庫的名稱。

    • 任何其他選項都會直接傳遞至 SparkSQL JDBC 寫入器。如需詳細資訊,請參閱 Spark 的 Redshift 資料來源

  • redshiftTmpDir — 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。

  • transformationContext — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。

  • catalogId — 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。

範例程式碼:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

傳回 DataSink

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

建立將資料DataSink寫入像 Amazon Simple Storage Service (Amazon S3)、JDBC 或 AWS Glue 資料型錄或 Apache 卡夫卡或 Amazon Kinesis 資料串流這樣的目的地。

傳回 DataSink

高清 getSinkWith格式

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

建立一DataSink個將資料寫入 Amazon S3、JDBC 或資料型錄等目的地,或者阿帕奇卡夫卡或 Amazon Kinesis 資料串流。還可以設定要寫出至目標的資料的格式。

傳回 DataSink

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

建立DataSource 特徵可從 Amazon S3、JDBC 或 AWS Glue 資料型錄等來源讀取資料的資料。也支援 Kafka 和 Kinesis 串流資料來源。

  • connectionType — 資料來源的類型。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項

  • connectionOptions — JSON 名稱值組的字串,可提供與資料來源建立連線的額外資料。如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項

    Kinesis 串流來源需要下列連線選項:streamARNstartingPositioninferSchemaclassification

    Kafka 串流來源需要以下連線選項:connectionNametopicNamestartingOffsetsinferSchemaclassification

  • transformationContext — 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。

  • pushDownPredicate — 分割區欄上的述詞。

傳回 DataSource

Amazon Kinesis 串流來源範例:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Kafka 串流來源範例:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

高清 getSourceWith格式

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

建立DataSource 特徵可從 Amazon S3、JDBC 或 AWS Glue 資料型錄等來源讀取資料的資料,並設定儲存在來源中的資料格式。

  • connectionType – 資料來源的類型。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項

  • options – JSON 名稱值組的字串,可提供與資料來源建立連線的額外資料。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項

  • transformationContext – 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。

  • format – 來源中所存放資料的格式。當 connectionType 為「s3」時,您也可以指定 format。可以是「avro」、「csv」、「grokLog」、「ion」、「json」、「xml」、「parquet」或「orc」其中之一。

  • formatOptions – JSON 名稱值組的字串,會提供在來源剖析資料的其他選項。請參閱資料格式選項

傳回 DataSource

範例

DynamicFrame 從 Amazon S3 上以逗號分隔值 (CSV) 檔案為檔案的資料來源建立:

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

DynamicFrame 從使用 JDBC 連接的 PostgreSQL 的數據源創建一個:

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

使用 JDBC 連接 DynamicFrame 從一個 MySQL 的數據源創建一個:

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

高清 getSparkSession

def getSparkSession : SparkSession

取得與此 GlueContext 相關聯的 SparkSession 物件。使用此 SparkSession 物件可註冊表格和 UDF,以便與DataFrame建立來源 DynamicFrames搭配使用。

傳回 SparkSession。

def startTransaction

def startTransaction(readOnly: Boolean):String

開始新交易。內部呼叫 Lake Formation startTransaction API。

  • readOnly – (布林值) 指出此交易應該是唯讀,還是讀取和寫入。使用唯讀交易 ID 進行的寫入將被拒絕。唯讀交易不需要遞交。

傳回交易 ID。

def commitTransaction

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

嘗試遞交指定的交易。commitTransaction 可能會在交易完成遞交之前返回。內部呼叫 Lake Formation commitTransaction API。

  • transactionId – (字串) 要遞交的交易。

  • waitForCommit – (布林值) 決定 commitTransaction 是否立即傳回。預設值為 true。如為 False,commitTransaction 輪詢並等待,直到交易完成遞交。使用指數退避時,等待時間長度限制為 1 分鐘,最多可嘗試 6 次重試。

傳回一個布林值,指示遞交是否完成。

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

嘗試取消指定的交易。內部調用 Lake Formation CancelTransactionAPI。

  • transactionId – (字串) 要取消的交易。

如果交易先前已遞交,傳回 TransactionCommittedException 例外狀況。

def 此

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

使用指定的 SparkContext、最小分割區和分割區目標來建立 GlueContext 物件。

  • sc — SparkContext

  • minPartitions — 分割區最小數。

  • targetPartitions — 分割區目標數。

傳回 GlueContext

def 此

def this( sc : SparkContext )

透過提供的 SparkContext 建立 GlueContext 物件。將分割區的最小值設為 10,目標分割區設為 20。

  • sc — SparkContext

傳回 GlueContext

def 此

def this( sparkContext : JavaSparkContext )

透過提供的 JavaSparkContext 建立 GlueContext 物件。將分割區的最小值設為 10,目標分割區設為 20。

  • sparkContext — JavaSparkContext

傳回 GlueContext