AWS Glue Scala GlueContext API
パッケージ: com.amazonaws.services.glue
class GlueContext extends SQLContext(sc) (
@transient val sc : SparkContext,
val defaultSourcePartitioner : PartitioningStrategy )
GlueContext
は、Amazon Simple Storage Service (Amazon S3) や、AWS Glue Data Catalog、JDBC その他との間で、DynamicFrame の読み取りと書き込みを行うためのエントリポイントです。このクラスが提供するユーティリティ関数によって DataSource 特性 オブジェクトと DataSink オブジェクトが作成され、これらのオブジェクトを使用して DynamicFrame
を読み書きできます。
また、ソースから作成されたパーティションの数がパーティションの最小しきい値 (デフォルトは 10) 未満の場合は、DynamicFrame
で GlueContext
を使用してパーティションのターゲット数 (デフォルトは 20) を設定することもできます。
def addIngestionTimeColumns
def addIngestionTimeColumns(
df : DataFrame,
timeGranularity : String = "") : dataFrame
入力 DataFrame
への取り込み時間列 (ingest_year
、ingest_month
、ingest_day
、ingest_hour
、ingest_minute
) を追加します。Amazon S3 のデータカタログテーブルをターゲットとして指定する場合、この関数は、AWS Glue により生成されたスクリプト内で自動的に生成されます。この関数は、出力テーブル上で、取り込み時間列があるパーティションを自動的に更新します。これにより、入力データに明示的な取り込み時間列を指定しなくても、取り込み時間において出力データの自動的なパーティション化が行えます。
-
dataFrame
– 取り込み時間列の追加先であるdataFrame
。 -
timeGranularity
– 時間列の詳細度。有効な値は「day
」、「hour
」、および「minute
」です。例えば、関数に対し「hour
」が渡された場合、元のdataFrame
は「ingest_year
」、「ingest_month
」、「ingest_day
」に加え「ingest_hour
」の時間列を持つことになります。
時間の詳細度列を追加した後、そのデータフレームを返します。
例:
glueContext.addIngestionTimeColumns(dataFrame, "hour")
def createDataFrameFromOptions
def createDataFrameFromOptions( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
指定された接続と形式で作成された DataFrame
を返します。この関数は、AWS Glue のストリーミングソースのみで使用してください。
connectionType
– ストリーミング接続タイプ。有効な値は、kinesis
およびkafka
です。-
connectionOptions
– 接続オプション。これは、Kinesis と Kafka で異なります。各ストリーミングデータソースのすべての接続オプションの一覧は、AWS Glue for Spark での ETL の接続タイプとオプション で確認いただけます。ストリーミング接続オプションについては、以下の違いに注意してください。-
Kinesis ストリーミングのソースには
streamARN
、startingPosition
、inferSchema
、およびclassification
が必要です。 -
Kafka ストリーミングのソースには
connectionName
、topicName
、startingOffsets
、inferSchema
、およびclassification
が必要です。
-
transformationContext
– 使用する変換コンテキスト (オプション)。format
– 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。formatOptions
– 指定された形式に関するオプション。サポートされる形式オプションについては、「データ形式に関するオプション」を参照してください。
Amazon Kinesis ストリーミングソースの例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))
Kafka ストリーミングソースの例:
val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))
forEachBatch
forEachBatch(frame, batch_function, options)
ストリーミングソースから読み取られるすべてのマイクロバッチに渡される、batch_function
を適用します。
-
frame
– 現在のマイクロバッチを含む DataFrame。 -
batch_function
– すべてのマイクロバッチに適用される関数。 -
options
– マイクロバッチの処理方法に関する情報を保持している、キーと値のペアの集合。以下のような必須オプションがあります。-
windowSize
– 各バッチの処理にかかる時間。 -
checkpointLocation
– ストリーミング ETL ジョブ用に、チェックポイントが格納される場所。 -
batchMaxRetries
– 失敗した場合にこのバッチを再試行する最大回数。デフォルト値は 3 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。
-
例:
glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))
def getCatalogSink
def getCatalogSink( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSink
Data Catalog で定義されているテーブル内の指定した場所に書き込みを行う DataSink を作成します。
database
– Data Catalog 内のデータベース名。tableName
– Data Catalog 内のテーブル名。redshiftTmpDir
– 特定のデータシンクで使用する一時的なステージングディレクトリ。デフォルトでは空に設定されます。transformationContext
– ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。additionalOptions
– AWS Glue で使用する追加のオプション。catalogId
– 現在アクセスされている Data Catalog のカタログ ID (アカウント ID)。null の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。
戻り値は DataSink
。
def getCatalogSource
def getCatalogSource( database : String,
tableName : String,
redshiftTmpDir : String = "",
transformationContext : String = ""
pushDownPredicate : String = " "
additionalOptions: JsonOptions = JsonOptions.empty,
catalogId: String = null
) : DataSource
Data Catalog のテーブル定義からデータを読み取る DataSource 特性 を作成します。
database
– Data Catalog 内のデータベース名。tableName
– Data Catalog 内のテーブル名。redshiftTmpDir
– 特定のデータシンクで使用する一時的なステージングディレクトリ。デフォルトでは空に設定されます。transformationContext
– ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。pushDownPredicate
– データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳しくは、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。additionalOptions
- オプションの名前と値のペアのコレクション。AWS Glue for Spark での ETL の接続タイプとオプション でリストされている使用可能なオプション (endpointUrl
、streamName
、bootstrap.servers
、security.protocol
、topicName
、classification
、およびdelimiter
を除く)。別のオプションとして、catalogPartitionPredicate
もサポートされています。catalogPartitionPredicate
– カタログ式を渡して、インデックス列に基づいたフィルタリングができます。これにより、フィルタリングをサーバー側で処理できます。詳細については、「AWS Glue パーティションインデックス」を参照してください。push_down_predicate
とcatalogPartitionPredicate
では、異なる構文が使用されることに注意してください。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。catalogId
– 現在アクセスされている Data Catalog のカタログ ID (アカウント ID)。null の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。
戻り値は DataSource
。
ストリーミングソースの例
val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()
def getJDBCSink
def getJDBCSink( catalogConnection : String,
options : JsonOptions,
redshiftTmpDir : String = "",
transformationContext : String = "",
catalogId: String = null
) : DataSink
Data Catalog 内の Connection
オブジェクトに指定されている、JDBC データベースに対し書き込みを行う DataSink を作成します。Connection
オブジェクトには、URL、ユーザー名、パスワード、VPC、サブネット、セキュリティグループなど、JDBC シンクに接続するための情報があります。
catalogConnection
– 書き込み先の JDBC URL を含む Data Catalog の接続名。options
– JDBC データストアへの書き込みに必要な追加情報を提供する、名前/値ペアを示す JSON 形式の文字列。これには、以下が含まれます。dbtable (必須) - JDBC テーブルの名前。データベース内でスキーマをサポートする JDBC データストアの場合、
schema.table-name
を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。次の例では、test_db
データベースのtest
という名前のスキーマおよびtest_table
という名前のテーブルを指すオプションパラメータを示します。options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
database (必須) - JDBC データベースの名前。
その他のオプションはすべて SparkSQL JDBC ライターに直接渡されます。詳細については、「Redshift data source for Spark
」を参照してください。
redshiftTmpDir
– 特定のデータシンクで使用する一時的なステージングディレクトリ。デフォルトでは空に設定されます。transformationContext
– ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。catalogId
– 現在アクセスされている Data Catalog のカタログ ID (アカウント ID)。null の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。
コード例:
getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")
戻り値は DataSink
。
def getSink
def getSink( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
) : DataSink
Amazon Simple Storage Service (Amazon S3)、JDBC、AWS Glue Data Catalog、Apache Kafka、Amazon Kinesis Data Streams などの書き込み先にデータを書き込む DataSink を作成します。
-
connectionType
- 接続のタイプ。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。 -
connectionOptions
– データシンクとの接続を確立するための追加情報を提供する JSON 形式の名前と値のペアの文字列。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。 -
transformationContext
– ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。
戻り値は DataSink
。
def getSinkWithFormat
def getSinkWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSink
Amazon S3、JDBC、Data Catalog、Apache Kafka、Amazon Kinesis Data Streams などの書き込み先にデータを書き込む DataSink を作成します。書き込み先に書き込むデータの形式も設定します。
connectionType
- 接続のタイプ。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。-
options
- データシンクとの接続を確立するための追加情報を提供する JSON 形式の名前/値ペアの文字列。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。 transformationContext
– ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。format
- 書き込み先に書き込むデータの形式。formatOptions
— 書き込み先でデータをフォーマットするための追加オプションを提供する JSON 形式の名前と値のペアの文字列。「データ形式に関するオプション」を参照してください。
戻り値は DataSink
。
def getSource
def getSource( connectionType : String,
connectionOptions : JsonOptions,
transformationContext : String = ""
pushDownPredicate
) : DataSource
Amazon S3、JDBC、AWS Glue Data Catalog などのソースからデータを読み取る DataSource 特性 を作成します。Kafka および Kinesis ストリーミングデータソースもサポートしています。
connectionType
– データソースの型。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。-
connectionOptions
– データソースとの接続を確立するための追加情報を提供する JSON 形式の名前と値のペアの文字列。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。Kinesis ストリーミングソースには、
streamARN
、startingPosition
、inferSchema
、およびclassification
の接続オプションが必要です。Kafka ストリーミングソースには、
connectionName
、topicName
、startingOffsets
、inferSchema
、およびclassification
の接続オプションが必要です。 transformationContext
– ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。pushDownPredicate
— パーティション列に関する述語です。
戻り値は DataSource
。
Amazon Kinesis ストリーミングソースの例:
val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
Kafka ストリーミングソースの例:
val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }
def getSourceWithFormat
def getSourceWithFormat( connectionType : String,
options : JsonOptions,
transformationContext : String = "",
format : String = null,
formatOptions : JsonOptions = JsonOptions.empty
) : DataSource
Amazon S3、JDBC、AWS Glue Data Catalog などのソースからのデータ読み取りと、ソースに保存されているデータの形式指定を行うための DataSource 特性 を作成します。
connectionType
– データソースの型。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。-
options
– データソースとの接続を確立するための追加情報を提供する、名前と値のペアを示す JSON 形式の文字列。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。 transformationContext
– ジョブのブックマークで使用するシンクに関連付けられている変換コンテキスト。デフォルトでは空に設定されます。format
– ソースに保存されるデータの形式。connectionType
が "s3" のときは、format
を指定することもできます。"avro"、"csv"、"grokLog"、"ion"、"json"、"xml"、"parquet"、"orc" のいずれかになります。formatOptions
– ソースでデータを解析するための追加オプションを提供する、名前と値ペアを示す JSON 形式の文字列。「データ形式に関するオプション」を参照してください。
戻り値は DataSource
。
例
データソース (Amazon S3 上のコンマ区切り値 (CSV) ファイル) から DynamicFrame を作成します。
val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "
s3://csv/nycflights.csv
"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()
データソース (JDBC 接続を使用している PostgreSQL) から DynamicFrame を作成します。
val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://
databasePostgres-1
.rds.amazonaws.com:5432
/testdb
", "dbtable": "public.company
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
データソース (JDBC 接続を使用している MySQL) からDynamicFrameを作成します。
val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://
databaseMysql-1
.rds.amazonaws.com:3306
/testdb
", "dbtable": "athenatest_nycflights13_csv
", "redshiftTmpDir":"", "user":"username
", "password":"password123
" }"""), transformationContext = "datasource0").getDynamicFrame()
def getSparkSession
def getSparkSession : SparkSession
この GlueContext に関連付けられている SparkSession
オブジェクトを取得します。この SparkSession オブジェクトでは、DynamicFrames から作成した DataFrame
で使用するテーブルと UDF を登録します。
SparkSession を返します。
def startTransaction
def startTransaction(readOnly: Boolean):String
新しいトランザクションの開始。Lake Formation startTransaction API を内部的に呼び出します。
readOnly
— (Boolean) このトランザクションを読み取り専用にするか、または読み取りおよび書き込みを行うかを示します。読み取り専用のトランザクション ID を使用した書き込みは拒否されます。読み取り専用トランザクションはコミットする必要はありません。
トランザクション ID を返します。
def commitTrac
def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean
指定されたトランザクションをコミットしようとします。commitTransaction
トランザクションのコミットが完了する前に戻ることがあります。Lake Formation startTransaction API を内部的に呼び出します。
transactionId
— (文字列) コミットするトランザクション。waitForCommit
— (ブール値)commitTransaction
がすぐに戻るかどうか指定します。デフォルト値は True です。false の場合、commitTransaction
はトランザクションがコミットされるまでポーリングし待機します。最大で 6 回の再試行でエクスポネンシャルバックオフを使用すると、待機時間は 1 分に制限されます。
コミットが完了したかどうかを示すブール値を返します。
def cancelTransaction
def cancelTransaction(transactionId: String): Unit
指定されたトランザクションをキャンセルしようとします。Lake Formation startTransaction API を内部的に呼び出します。
transactionId
— (文字列) キャンセルするトランザクション。
戻り値は、トランザクションが以前にコミットされた場合は TransactionCommittedException
例外です。
def this
def this( sc : SparkContext,
minPartitions : Int,
targetPartitions : Int )
指定した SparkContext
、最小パーティション数、ターゲットパーティション数を使用して GlueContext
オブジェクトを作成します。
sc
-SparkContext
。minPartitions
- 最小パーティション数。targetPartitions
- ターゲットパーティション数。
戻り値は GlueContext
。
def this
def this( sc : SparkContext )
渡された SparkContext
で GlueContext
オブジェクトを作成します。最小限のパーティション数を 10 に設定し、ターゲットパーティション数を 20 に設定します。
sc
-SparkContext
。
戻り値は GlueContext
。
def this
def this( sparkContext : JavaSparkContext )
渡された JavaSparkContext
で GlueContext
オブジェクトを作成します。最小限のパーティション数を 10 に設定し、ターゲットパーティション数を 20 に設定します。
sparkContext
-JavaSparkContext
。
戻り値は GlueContext
。