本文為英文版的機器翻譯版本,如內容有任何歧義或不一致之處,概以英文版為準。
AWS Glue斯卡 GlueContext 拉
Package: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
是讀取和寫入 DynamicFrame 至 Amazon Simple Storage Service (Amazon S3)、 AWS Glue Data Catalog 、JDBC 等的進入點。此類別提供公用程式函數來建立 DataSource 特徵 和 DataSink 物件,從而用於讀取和寫入 DynamicFrame
。
如果從來源建立的分割區數低於分割區的閾值下限 (預設 10),您也可以使用 GlueContext
來設定在 DynamicFrame
中的分割區目標數 (預設 20)。
高清 addIngestionTime列
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
附加擷取時間欄 (如 ingest_year
、ingest_month
、ingest_day
、ingest_hour
、ingest_minute
) 到輸入 DataFrame
。當您指定以 Amazon S3 為目標的 Data Catalog 資料表時,此函數會在 AWS Glue 產生的指令碼中自動產生。此函數會自動使用輸出資料表上的擷取時間欄來更新分割區。這可讓輸出資料在擷取時間自動分割,而不需要輸入資料中的明確擷取時間欄。
-
dataFrame
– 要將擷取時間欄附加到的dataFrame
。 -
timeGranularity
– 時間欄的精密程度。有效值為 "day
"、"hour
" 和 "minute
"。例如:如果 "hour
" 被傳遞給函數,原始dataFrame
會附加上 "ingest_year
"、"ingest_month
"、"ingest_day
" 和 "ingest_hour
" 時間欄。
傳回附加時間粒度欄後的資料框架。
範例:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
高清 createDataFrame FromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
傳回使用指定的連線和格式建立的 DataFrame
。此功能僅與 AWS Glue 串流來源搭配使用。
connectionType
– 串流連線類型。有效值包括kinesis
與kafka
。-
connectionOptions
– 連線選項,這些選項對於 Kinesis 和 Kafka 而言是不同的。您可以在 AWS Glue for Spark 中 ETL 的連線類型和選項 中找到每個串流資料來源的所有連線選項清單。請注意串流連線選項的下列不同處:-
Kinesis 串流來源需要
streamARN
、startingPosition
、inferSchema
以及classification
。 -
Kafka 串流來源需要
connectionName
、topicName
、startingOffsets
、inferSchema
以及classification
。
-
transformationContext
– 要使用的轉換細節 (選用)。format
– 格式化規格 (選用)。這是用於 Amazon S3 或支援多種格式的 AWS Glue 連線。如需有關支援格式的資訊,請參閱 AWS Glue for Spark 中的輸入與輸出的資料格式選項formatOptions
– 指定格式的格式選項。如需支援格式選項的詳細資訊,請參閱 資料格式選項。
Amazon Kinesis 串流來源範例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Kafka 串流來源範例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
將傳入的 batch_function
套用至從串流來源讀取的每個微批次。
-
frame
— 包 DataFrame 含當前微批次。 -
batch_function
– 將套用至每個微批次的函數。 -
options
– 索引鍵/值配對的集合,其中包含如何處理微批次的相關資訊。下列選項是必要的:-
windowSize
– 處理每個批次的時間量。 -
checkpointLocation
- 串流 ETL 任務的檢查點儲存位置。 -
batchMaxRetries
– 如果失敗,可重試批次的次數上限。預設值為 3。此選項僅在 Glue 2.0 及以上版本上才可設定。
-
範例:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
高清 getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
建立 DataSink,以便寫入 Data Catalog 中定義之資料表中指定的位置。
database
— Data Catalog 中的資料庫名稱。tableName
— Data Catalog 中的資料表名稱。redshiftTmpDir
— 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。transformationContext
— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。additionalOptions
– 提供給 AWS Glue 的額外選項。catalogId
— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。
傳回 DataSink
。
高清 getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
建立 DataSource 特徵,以便從 Data Catalog 中的資料表定義中讀取資料。
database
— Data Catalog 中的資料庫名稱。tableName
— Data Catalog 中的資料表名稱。redshiftTmpDir
— 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。transformationContext
— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。pushDownPredicate
– 篩選分割區,而無需列出和讀取資料集中的所有檔案。如需詳細資訊,請參閱 使用 pushdown 述詞預先篩選。additionalOptions
– 選擇性的名稱/值對的集合。可能的選項包括 AWS Glue for Spark 中 ETL 的連線類型和選項 中列出的項目,除了endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
以及delimiter
。另一個支援的選項是catalogPartitionPredicate
:catalogPartitionPredicate
— 您可以傳遞目錄表達式以根據索引欄進行篩選。這會將篩選下推至伺服器端。如需詳細資訊,請參閱 AWS Glue 分割區索引。注意push_down_predicate
和catalogPartitionPredicate
使用不同的語法。前者使用 Spark SQL 標準語法,後者使用 JSQL 剖析器。catalogId
— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。
傳回 DataSource
。
串流來源範例
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
建立 DataSink,以便寫入 Data Catalog 中 Connection
物件所指定的 JDBC 資料庫。此 Connection
物件擁有用來對 JDBC 目的地連線的資訊 (包括 URL、使用者名稱、密碼、VPC、子網路和安全群組)。
catalogConnection
— Data Catalog 中的連線名稱,其中包含要做為寫入目的地之 JDBC URL。options
— JSON 名稱值組的字串,可提供寫入 JDBC 資料存放區所需的其他資訊。其中包含:dbtable (必要) — JDBC 資料表的名稱。若是支援資料庫內結構描述的 JDBC 資料存放區,請指定
schema.table-name
。如果未提供結構描述,則會使用預設的 "public" 結構描述。以下範例說明 options 參數,它會指向資料庫test_db
中名為test
的結構描述和名為test_table
的資料表。options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (必要) — JDBC 資料庫的名稱。
任何其他選項都會直接傳遞至 SparkSQL JDBC 寫入器。如需詳細資訊,請參閱 Spark 的 Redshift 資料來源
。
redshiftTmpDir
— 要與特定資料目的地搭配使用的臨時暫存目錄。設定為 預設為空值。transformationContext
— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。catalogId
— 要存取之 Data Catalog 的目錄 ID (帳戶 ID)。為 null 時,會使用發起人的預設帳戶 ID。
範例程式碼:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
傳回 DataSink
。
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
建立將資料DataSink寫入像 Amazon Simple Storage Service (Amazon S3)、JDBC 或 AWS Glue 資料型錄或 Apache 卡夫卡或 Amazon Kinesis 資料串流這樣的目的地。
-
connectionType
— 連線的類型。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。 -
connectionOptions
— JSON 名稱值組的字串,可提供與資料目的地建立連線的額外資料。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。 -
transformationContext
— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。
傳回 DataSink
。
高清 getSinkWith格式
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
建立一DataSink個將資料寫入 Amazon S3、JDBC 或資料型錄等目的地,或者阿帕奇卡夫卡或 Amazon Kinesis 資料串流。還可以設定要寫出至目標的資料的格式。
connectionType
— 連線的類型。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。-
options
— JSON 名稱值組的字串,可提供與資料目的地建立連線的額外資料。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。 transformationContext
— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。format
— 要從目的地寫出的資料格式。formatOptions
— JSON 名稱值組的字串,會提供在目的地格式化資料的其他選項。請參閱資料格式選項。
傳回 DataSink
。
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
建立DataSource 特徵可從 Amazon S3、JDBC 或 AWS Glue 資料型錄等來源讀取資料的資料。也支援 Kafka 和 Kinesis 串流資料來源。
connectionType
— 資料來源的類型。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。-
connectionOptions
— JSON 名稱值組的字串,可提供與資料來源建立連線的額外資料。如需詳細資訊,請參閱 AWS Glue for Spark 中 ETL 的連線類型和選項。Kinesis 串流來源需要下列連線選項:
streamARN
、startingPosition
、inferSchema
及classification
。Kafka 串流來源需要以下連線選項:
connectionName
、topicName
、startingOffsets
、inferSchema
及classification
。 transformationContext
— 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。pushDownPredicate
— 分割區欄上的述詞。
傳回 DataSource
。
Amazon Kinesis 串流來源範例:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Kafka 串流來源範例:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
高清 getSourceWith格式
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
建立DataSource 特徵可從 Amazon S3、JDBC 或 AWS Glue 資料型錄等來源讀取資料的資料,並設定儲存在來源中的資料格式。
connectionType
– 資料來源的類型。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。-
options
– JSON 名稱值組的字串,可提供與資料來源建立連線的額外資料。請參閱AWS Glue for Spark 中 ETL 的連線類型和選項。 transformationContext
– 與由任務書籤使用之目的地關聯的轉換內容。設定為 預設為空值。format
– 來源中所存放資料的格式。當connectionType
為「s3」時,您也可以指定format
。可以是「avro」、「csv」、「grokLog」、「ion」、「json」、「xml」、「parquet」或「orc」其中之一。formatOptions
– JSON 名稱值組的字串,會提供在來源剖析資料的其他選項。請參閱資料格式選項。
傳回 DataSource
。
範例
DynamicFrame 從 Amazon S3 上以逗號分隔值 (CSV) 檔案為檔案的資料來源建立:
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
DynamicFrame 從使用 JDBC 連接的 PostgreSQL 的數據源創建一個:
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
使用 JDBC 連接 DynamicFrame 從一個 MySQL 的數據源創建一個:
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
高清 getSparkSession
def getSparkSession : SparkSession
取得與此 GlueContext 相關聯的 SparkSession
物件。使用此 SparkSession 物件可註冊表格和 UDF,以便與DataFrame
建立來源 DynamicFrames搭配使用。
傳回 SparkSession。
def startTransaction
def startTransaction(readOnly: Boolean):String
開始新交易。內部呼叫 Lake Formation startTransaction API。
readOnly
– (布林值) 指出此交易應該是唯讀,還是讀取和寫入。使用唯讀交易 ID 進行的寫入將被拒絕。唯讀交易不需要遞交。
傳回交易 ID。
def commitTransaction
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
嘗試遞交指定的交易。commitTransaction
可能會在交易完成遞交之前返回。內部呼叫 Lake Formation commitTransaction API。
transactionId
– (字串) 要遞交的交易。waitForCommit
– (布林值) 決定commitTransaction
是否立即傳回。預設值為 true。如為 False,commitTransaction
輪詢並等待,直到交易完成遞交。使用指數退避時,等待時間長度限制為 1 分鐘,最多可嘗試 6 次重試。
傳回一個布林值,指示遞交是否完成。
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
嘗試取消指定的交易。內部調用 Lake Formation CancelTransactionAPI。
transactionId
– (字串) 要取消的交易。
如果交易先前已遞交,傳回 TransactionCommittedException
例外狀況。
def 此
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
使用指定的 SparkContext
、最小分割區和分割區目標來建立 GlueContext
物件。
sc
—SparkContext
。minPartitions
— 分割區最小數。targetPartitions
— 分割區目標數。
傳回 GlueContext
。
def 此
def this( sc : SparkContext )
透過提供的 SparkContext
建立 GlueContext
物件。將分割區的最小值設為 10,目標分割區設為 20。
sc
—SparkContext
。
傳回 GlueContext
。
def 此
def this( sparkContext : JavaSparkContext )
透過提供的 JavaSparkContext
建立 GlueContext
物件。將分割區的最小值設為 10,目標分割區設為 20。
sparkContext
—JavaSparkContext
。
傳回 GlueContext
。