AWS Glue Scala GlueContext API - AWS Glue

AWS Glue Scala GlueContext API

パッケージ: com.amazonaws.services.glue

class GlueContext extends SQLContext(sc) ( @transient val sc : SparkContext, val defaultSourcePartitioner : PartitioningStrategy )

GlueContext は、Amazon Simple Storage Service (Amazon S3) や、AWS Glue Data Catalog、JDBC その他との間で、DynamicFrame の読み取りと書き込みを行うためのエントリポイントです。このクラスが提供するユーティリティ関数によって DataSource 特性 オブジェクトと DataSink オブジェクトが作成され、これらのオブジェクトを使用して DynamicFrame を読み書きできます。

また、ソースから作成されたパーティションの数がパーティションの最小しきい値 (デフォルトは 10) 未満の場合は、DynamicFrameGlueContext を使用してパーティションのターゲット数 (デフォルトは 20) を設定することもできます。

def addIngestionTimeColumns

def addIngestionTimeColumns( df : DataFrame, timeGranularity : String = "") : dataFrame

入力 DataFrame への取り込み時間列 (ingest_yearingest_monthingest_dayingest_houringest_minute) を追加します。Amazon S3 のデータカタログテーブルをターゲットとして指定する場合、この関数は、AWS Glue により生成されたスクリプト内で自動的に生成されます。この関数は、出力テーブル上で、取り込み時間列があるパーティションを自動的に更新します。これにより、入力データに明示的な取り込み時間列を指定しなくても、取り込み時間において出力データの自動的なパーティション化が行えます。

  • dataFrame – 取り込み時間列の追加先である dataFrame

  • timeGranularity – 時間列の詳細度。有効な値は「day」、「hour」、および「minute」です。例えば、関数に対し「hour」が渡された場合、元の dataFrame は「ingest_year」、「ingest_month」、「ingest_day」に加え「ingest_hour」の時間列を持つことになります。

時間の詳細度列を追加した後、そのデータフレームを返します。

例:

glueContext.addIngestionTimeColumns(dataFrame, "hour")

def createDataFrameFromOptions

def createDataFrameFromOptions( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

指定された接続と形式で作成された DataFrame を返します。この関数は、AWS Glue のストリーミングソースのみで使用してください。

  • connectionType – ストリーミング接続タイプ。有効な値は、kinesis および kafka です。

  • connectionOptions – 接続オプション。これは、Kinesis と Kafka で異なります。各ストリーミングデータソースのすべての接続オプションの一覧は、AWS Glue for Spark での ETL の接続タイプとオプション で確認いただけます。ストリーミング接続オプションについては、以下の違いに注意してください。

    • Kinesis ストリーミングのソースには streamARNstartingPositioninferSchema、および classification が必要です。

    • Kafka ストリーミングのソースには connectionNametopicNamestartingOffsetsinferSchema、および classification が必要です。

  • transformationContext – 使用する変換コンテキスト (オプション)。

  • format – 形式の仕様 (オプション)。Amazon S3 や、複数のフォーマットをサポートする AWS Glue 接続の場合に使用します。サポートされる形式については、「AWS Glue for Spark での入出力のデータ形式に関するオプション」を参照してください。

  • formatOptions – 指定された形式に関するオプション。サポートされる形式オプションについては、「データ形式に関するオプション」を参照してください。

Amazon Kinesis ストリーミングソースの例:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kinesis", connectionOptions = JsonOptions("""{"streamName": "example_stream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json"}}"""))

Kafka ストリーミングソースの例:

val data_frame_datasource0 = glueContext.createDataFrameFromOptions(transformationContext = "datasource0", connectionType = "kafka", connectionOptions = JsonOptions("""{"connectionName": "example_connection", "topicName": "example_topic", "startingPosition": "earliest", "inferSchema": "false", "classification": "json", "schema":"`column1` STRING, `column2` STRING"}"""))

forEachBatch

forEachBatch(frame, batch_function, options)

ストリーミングソースから読み取られるすべてのマイクロバッチに渡される、batch_function を適用します。

  • frame – 現在のマイクロバッチを含む DataFrame。

  • batch_function – すべてのマイクロバッチに適用される関数。

  • options – マイクロバッチの処理方法に関する情報を保持している、キーと値のペアの集合。以下のような必須オプションがあります。

    • windowSize – 各バッチの処理にかかる時間。

    • checkpointLocation – ストリーミング ETL ジョブ用に、チェックポイントが格納される場所。

    • batchMaxRetries – 失敗した場合にこのバッチを再試行する最大回数。デフォルト値は 3 です。このオプションは、Glue バージョン 2.0 以降でのみ設定可能です。

例:

glueContext.forEachBatch(data_frame_datasource0, (dataFrame: Dataset[Row], batchId: Long) => { if (dataFrame.count() > 0) { val datasource0 = DynamicFrame(glueContext.addIngestionTimeColumns(dataFrame, "hour"), glueContext) // @type: DataSink // @args: [database = "tempdb", table_name = "fromoptionsoutput", stream_batch_time = "100 seconds", // stream_checkpoint_location = "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/", // transformation_ctx = "datasink1"] // @return: datasink1 // @inputs: [frame = datasource0] val options_datasink1 = JsonOptions( Map("partitionKeys" -> Seq("ingest_year", "ingest_month","ingest_day", "ingest_hour"), "enableUpdateCatalog" -> true)) val datasink1 = glueContext.getCatalogSink( database = "tempdb", tableName = "fromoptionsoutput", redshiftTmpDir = "", transformationContext = "datasink1", additionalOptions = options_datasink1).writeDynamicFrame(datasource0) } }, JsonOptions("""{"windowSize" : "100 seconds", "checkpointLocation" : "s3://from-options-testing-eu-central-1/fromOptionsOutput/checkpoint/"}"""))

def getCatalogSink

def getCatalogSink( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSink

Data Catalog で定義されているテーブル内の指定した場所に書き込みを行う DataSink を作成します。

  • database – Data Catalog 内のデータベース名。

  • tableName – Data Catalog 内のテーブル名。

  • redshiftTmpDir – 特定のデータシンクで使用する一時的なステージングディレクトリ。デフォルトでは空に設定されます。

  • transformationContext – ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。

  • additionalOptions – AWS Glue で使用する追加のオプション。

  • catalogId – 現在アクセスされている Data Catalog のカタログ ID (アカウント ID)。null の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。

戻り値は DataSink

def getCatalogSource

def getCatalogSource( database : String, tableName : String, redshiftTmpDir : String = "", transformationContext : String = "" pushDownPredicate : String = " " additionalOptions: JsonOptions = JsonOptions.empty, catalogId: String = null ) : DataSource

Data Catalog のテーブル定義からデータを読み取る DataSource 特性 を作成します。

  • database – Data Catalog 内のデータベース名。

  • tableName – Data Catalog 内のテーブル名。

  • redshiftTmpDir – 特定のデータシンクで使用する一時的なステージングディレクトリ。デフォルトでは空に設定されます。

  • transformationContext – ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。

  • pushDownPredicate – データセットのすべてのファイルをリストアップして読み取る必要がないフィルタパーティション。詳しくは、「プッシュダウン述語を使用した事前フィルタ処理」を参照してください。

  • additionalOptions - オプションの名前と値のペアのコレクション。AWS Glue for Spark での ETL の接続タイプとオプション でリストされている使用可能なオプション (endpointUrlstreamNamebootstrap.serverssecurity.protocoltopicNameclassification、および delimiter を除く)。別のオプションとして、catalogPartitionPredicate もサポートされています。

    catalogPartitionPredicate – カタログ式を渡して、インデックス列に基づいたフィルタリングができます。これにより、フィルタリングをサーバー側で処理できます。詳細については、「AWS Glue パーティションインデックス」を参照してください。push_down_predicatecatalogPartitionPredicate では、異なる構文が使用されることに注意してください。前者では Spark SQL の標準構文を使用し、後者では JSQL パーサーを使用します。

  • catalogId – 現在アクセスされている Data Catalog のカタログ ID (アカウント ID)。null の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。

戻り値は DataSource

ストリーミングソースの例

val data_frame_datasource0 = glueContext.getCatalogSource( database = "tempdb", tableName = "test-stream-input", redshiftTmpDir = "", transformationContext = "datasource0", additionalOptions = JsonOptions("""{ "startingPosition": "TRIM_HORIZON", "inferSchema": "false"}""") ).getDataFrame()

def getJDBCSink

def getJDBCSink( catalogConnection : String, options : JsonOptions, redshiftTmpDir : String = "", transformationContext : String = "", catalogId: String = null ) : DataSink

Data Catalog 内の Connection オブジェクトに指定されている、JDBC データベースに対し書き込みを行う DataSink を作成します。Connection オブジェクトには、URL、ユーザー名、パスワード、VPC、サブネット、セキュリティグループなど、JDBC シンクに接続するための情報があります。

  • catalogConnection – 書き込み先の JDBC URL を含む Data Catalog の接続名。

  • options – JDBC データストアへの書き込みに必要な追加情報を提供する、名前/値ペアを示す JSON 形式の文字列。これには、以下が含まれます。

    • dbtable (必須) - JDBC テーブルの名前。データベース内でスキーマをサポートする JDBC データストアの場合、schema.table-name を指定します。スキーマを指定しない場合、デフォルトの「パブリック」スキーマが使用されます。次の例では、test_db データベースの test という名前のスキーマおよび test_table という名前のテーブルを指すオプションパラメータを示します。

      options = JsonOptions("""{"dbtable": "test.test_table", "database": "test_db"}""")
    • database (必須) - JDBC データベースの名前。

    • その他のオプションはすべて SparkSQL JDBC ライターに直接渡されます。詳細については、「Redshift data source for Spark」を参照してください。

  • redshiftTmpDir – 特定のデータシンクで使用する一時的なステージングディレクトリ。デフォルトでは空に設定されます。

  • transformationContext – ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。

  • catalogId – 現在アクセスされている Data Catalog のカタログ ID (アカウント ID)。null の場合は、呼び出し元のアカウント ID のデフォルトが使用されます。

コード例:

getJDBCSink(catalogConnection = "my-connection-name", options = JsonOptions("""{"dbtable": "my-jdbc-table", "database": "my-jdbc-db"}"""), redshiftTmpDir = "", transformationContext = "datasink4")

戻り値は DataSink

def getSink

def getSink( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" ) : DataSink

Amazon Simple Storage Service (Amazon S3)、JDBC、AWS Glue Data Catalog、Apache Kafka、Amazon Kinesis Data Streams などの書き込み先にデータを書き込む DataSink を作成します。

戻り値は DataSink

def getSinkWithFormat

def getSinkWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSink

Amazon S3、JDBC、Data Catalog、Apache Kafka、Amazon Kinesis Data Streams などの書き込み先にデータを書き込む DataSink を作成します。書き込み先に書き込むデータの形式も設定します。

  • connectionType - 接続のタイプ。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • options - データシンクとの接続を確立するための追加情報を提供する JSON 形式の名前/値ペアの文字列。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • transformationContext – ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。

  • format - 書き込み先に書き込むデータの形式。

  • formatOptions — 書き込み先でデータをフォーマットするための追加オプションを提供する JSON 形式の名前と値のペアの文字列。「データ形式に関するオプション」を参照してください。

戻り値は DataSink

def getSource

def getSource( connectionType : String, connectionOptions : JsonOptions, transformationContext : String = "" pushDownPredicate ) : DataSource

Amazon S3、JDBC、AWS Glue Data Catalog などのソースからデータを読み取る DataSource 特性 を作成します。Kafka および Kinesis ストリーミングデータソースもサポートしています。

  • connectionType – データソースの型。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • connectionOptions – データソースとの接続を確立するための追加情報を提供する JSON 形式の名前と値のペアの文字列。詳細については、「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

    Kinesis ストリーミングソースには、streamARNstartingPositioninferSchema、および classification の接続オプションが必要です。

    Kafka ストリーミングソースには、connectionNametopicNamestartingOffsetsinferSchema、および classification の接続オプションが必要です。

  • transformationContext – ジョブのブックマークで使用されるシンクに関連付けられた変換コンテキスト。デフォルトでは空に設定されます。

  • pushDownPredicate — パーティション列に関する述語です。

戻り値は DataSource

Amazon Kinesis ストリーミングソースの例:

val kinesisOptions = jsonOptions() data_frame_datasource0 = glueContext.getSource("kinesis", kinesisOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"streamARN": "arn:aws:kinesis:eu-central-1:123456789012:stream/fromOptionsStream", |"startingPosition": "TRIM_HORIZON", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

Kafka ストリーミングソースの例:

val kafkaOptions = jsonOptions() val data_frame_datasource0 = glueContext.getSource("kafka", kafkaOptions).getDataFrame() private def jsonOptions(): JsonOptions = { new JsonOptions( s"""{"connectionName": "ConfluentKafka", |"topicName": "kafka-auth-topic", |"startingOffsets": "earliest", |"inferSchema": "true", |"classification": "json"}""".stripMargin) }

def getSourceWithFormat

def getSourceWithFormat( connectionType : String, options : JsonOptions, transformationContext : String = "", format : String = null, formatOptions : JsonOptions = JsonOptions.empty ) : DataSource

Amazon S3、JDBC、AWS Glue Data Catalog などのソースからのデータ読み取りと、ソースに保存されているデータの形式指定を行うための DataSource 特性 を作成します。

  • connectionType – データソースの型。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • options – データソースとの接続を確立するための追加情報を提供する、名前と値のペアを示す JSON 形式の文字列。「AWS Glue for Spark での ETL の接続タイプとオプション」を参照してください。

  • transformationContext – ジョブのブックマークで使用するシンクに関連付けられている変換コンテキスト。デフォルトでは空に設定されます。

  • format – ソースに保存されるデータの形式。connectionType が "s3" のときは、format を指定することもできます。"avro"、"csv"、"grokLog"、"ion"、"json"、"xml"、"parquet"、"orc" のいずれかになります。

  • formatOptions – ソースでデータを解析するための追加オプションを提供する、名前と値ペアを示す JSON 形式の文字列。「データ形式に関するオプション」を参照してください。

戻り値は DataSource

データソース (Amazon S3 上のコンマ区切り値 (CSV) ファイル) から DynamicFrame を作成します。

val datasource0 = glueContext.getSourceWithFormat( connectionType="s3", options =JsonOptions(s"""{"paths": [ "s3://csv/nycflights.csv"]}"""), transformationContext = "datasource0", format = "csv", formatOptions=JsonOptions(s"""{"withHeader":"true","separator": ","}""") ).getDynamicFrame()

データソース (JDBC 接続を使用している PostgreSQL) から DynamicFrame を作成します。

val datasource0 = glueContext.getSourceWithFormat( connectionType="postgresql", options =JsonOptions(s"""{ "url":"jdbc:postgresql://databasePostgres-1.rds.amazonaws.com:5432/testdb", "dbtable": "public.company", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

データソース (JDBC 接続を使用している MySQL) からDynamicFrameを作成します。

val datasource0 = glueContext.getSourceWithFormat( connectionType="mysql", options =JsonOptions(s"""{ "url":"jdbc:mysql://databaseMysql-1.rds.amazonaws.com:3306/testdb", "dbtable": "athenatest_nycflights13_csv", "redshiftTmpDir":"", "user":"username", "password":"password123" }"""), transformationContext = "datasource0").getDynamicFrame()

def getSparkSession

def getSparkSession : SparkSession

この GlueContext に関連付けられている SparkSession オブジェクトを取得します。この SparkSession オブジェクトでは、DynamicFrames から作成した DataFrame で使用するテーブルと UDF を登録します。

SparkSession を返します。

def startTransaction

def startTransaction(readOnly: Boolean):String

新しいトランザクションの開始。Lake Formation startTransaction API を内部的に呼び出します。

  • readOnly — (Boolean) このトランザクションを読み取り専用にするか、または読み取りおよび書き込みを行うかを示します。読み取り専用のトランザクション ID を使用した書き込みは拒否されます。読み取り専用トランザクションはコミットする必要はありません。

トランザクション ID を返します。

def commitTrac

def commitTransaction(transactionId: String, waitForCommit: Boolean): Boolean

指定されたトランザクションをコミットしようとします。commitTransaction トランザクションのコミットが完了する前に戻ることがあります。Lake Formation startTransaction API を内部的に呼び出します。

  • transactionId — (文字列) コミットするトランザクション。

  • waitForCommit — (ブール値) commitTransaction がすぐに戻るかどうか指定します。デフォルト値は True です。false の場合、commitTransaction はトランザクションがコミットされるまでポーリングし待機します。最大で 6 回の再試行でエクスポネンシャルバックオフを使用すると、待機時間は 1 分に制限されます。

コミットが完了したかどうかを示すブール値を返します。

def cancelTransaction

def cancelTransaction(transactionId: String): Unit

指定されたトランザクションをキャンセルしようとします。Lake Formation startTransaction API を内部的に呼び出します。

  • transactionId — (文字列) キャンセルするトランザクション。

戻り値は、トランザクションが以前にコミットされた場合は TransactionCommittedException 例外です。

def this

def this( sc : SparkContext, minPartitions : Int, targetPartitions : Int )

指定した SparkContext、最小パーティション数、ターゲットパーティション数を使用して GlueContext オブジェクトを作成します。

  • sc - SparkContext

  • minPartitions - 最小パーティション数。

  • targetPartitions - ターゲットパーティション数。

戻り値は GlueContext

def this

def this( sc : SparkContext )

渡された SparkContextGlueContext オブジェクトを作成します。最小限のパーティション数を 10 に設定し、ターゲットパーティション数を 20 に設定します。

  • sc - SparkContext

戻り値は GlueContext

def this

def this( sparkContext : JavaSparkContext )

渡された JavaSparkContextGlueContext オブジェクトを作成します。最小限のパーティション数を 10 に設定し、ターゲットパーティション数を 20 に設定します。

  • sparkContext - JavaSparkContext

戻り値は GlueContext