ジョブのブックマークを使用する
Spark 用の AWS Glue はジョブのブックマークを使用して処理済みのデータを追跡します。ジョブのブックマーク機能の概要とサポート内容については、「ジョブのブックマークを使用した処理済みデータの追跡」を参照してください。ブックマークを使用して AWS Glue ジョブをプログラミングすると、ビジュアルジョブでは実現できない柔軟性が得られます。
-
JDBC から読み込む場合、AWS Glue スクリプトのブックマークキーとして使用する列を指定できます。
-
各メソッド呼び出しに適用する
transformation_ctx
を選択できます。
常に適切に設定されたパラメータでスクリプトの先頭に job.init
を呼び出して、末尾に job.commit
を呼び出します。これら 2 つの関数は、ブックマークサービスを初期化し、サービスの状態変化を更新します。ブックマークは呼び出さないと機能しません。
ブックマークキーの指定
JDBC ワークフローの場合、ブックマークはキーフィールドの値をブックマークされた値と比較することで、ジョブがどの行を読み込んたかを追跡します。これは Amazon S3 ワークフローには必要ありませんし、適用可能でもありません。ビジュアルエディタを使用せずに AWS Glue スクリプトを記述する場合、どの列をブックマークで追跡するかを指定できます。複数の列を指定することもできます。ユーザー定義のブックマークキーを指定する場合、値の順序にギャップがあってもかまいません。
警告
ユーザー定義のブックマークキーを使用する場合、キーはそれぞれ厳密に一定間隔で増減する必要があります。複合キーに追加のフィールドを選択しても、「マイナーバージョン」や「リビジョン番号」などの概念のフィールドは、値がデータセット全体で再利用されるため、この基準を満たしません。
jobBookmarkKeys
および jobBookmarkKeysSortOrder
は以下の方法で指定できます。
-
create_dynamic_frame.from_catalog
—additional_options
を使用する。 -
create_dynamic_frame.from_options
—connection_options
を使用する。
変換コンテキスト
AWS Glue PySpark の動的フレームの多くのメソッドには、transformation_ctx
というオプションのパラメータが含まれています。このパラメータは ETL 演算子インスタンスの一意の識別子です。transformation_ctx
パラメータは指定された演算子に対するジョブのブックマーク内の状態情報を識別するために使用されます。具体的には、AWS Glue では transformation_ctx
を使用してブックマーク状態に対するキーにインデックスを付けます。
警告
transformation_ctx
は、スクリプト内の特定のソースのブックマーク状態を検索するためのキーとして機能します。ブックマークを正しく機能させるためには、ソースと関連する transformation_ctx
の一貫性を常に保つ必要があります。ソースプロパティを変更したり、transformation_ctx
の名前を変更したりすると、前のブックマークが無効になり、タイムスタンプベースのフィルタリングで正しい結果が得られない場合があります。
ジョブのブックマークが正しく機能するように、ジョブのブックマークのパラメータを有効にし、transformation_ctx
パラメータを設定します。transformation_ctx
パラメータを渡さない場合、メソッドで使用されている動的フレームやテーブルに対してジョブのブックマークは有効になりません。例えば、ETL ジョブで 2 つの Amazon S3 ソースを読み取って結合する場合、ブックマークを有効にするメソッドに対してのみ transformation_ctx
パラメータを渡すことができます。1 つのジョブについてジョブのブックマークをリセットした場合、transformation_ctx
の使用には関係なく、このジョブに関連付けられているすべての変換がリセットされます。
DynamicFrameReader
クラスの詳細については、「DynamicFrameReader クラス」を参照してください。PySpark 拡張の詳細については、「AWS Glue PySpark 拡張機能リファレンス」を参照してください
例
Amazon S3 データソース用に生成されたスクリプトの例を次に示します。ジョブのブックマークを使用するために必要なスクリプトの部分は、斜体で表示されています。これらの要素の詳細については、GlueContext クラス API および DynamicFrameWriter クラス API を参照してください。
# Sample Script import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "database", table_name = "relatedqueries_csv", transformation_ctx = "datasource0" ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("col0", "string", "name", "string"), ("col1", "string", "number", "string")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://input_path"}, format = "json", transformation_ctx = "datasink2" ) job.commit()
JDBC ソース用に生成されたスクリプトの例を以下に示します。ソーステーブルは、empno
列がプライマリキーである従業員テーブルです。デフォルトでは、ブックマークキーが指定されていない場合、ジョブはブックマークキーとしてシーケンシャルプライマリキーを使用しますが、empno
は必ずしもシーケンシャルではなく、値にギャップがある可能性があるため、デフォルトのブックマークキーとして適切ではありません。したがって、スクリプトは empno
を明示的にブックマークキーとして指定します。コードのその部分は、斜体で表示されます。
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) datasource0 = glueContext.create_dynamic_frame.from_catalog( database = "hr", table_name = "emp", transformation_ctx = "datasource0", additional_options = {"jobBookmarkKeys":["empno"],"jobBookmarkKeysSortOrder":"asc"} ) applymapping1 = ApplyMapping.apply( frame = datasource0, mappings = [("ename", "string", "ename", "string"), ("hrly_rate", "decimal(38,0)", "hrly_rate", "decimal(38,0)"), ("comm", "decimal(7,2)", "comm", "decimal(7,2)"), ("hiredate", "timestamp", "hiredate", "timestamp"), ("empno", "decimal(5,0)", "empno", "decimal(5,0)"), ("mgr", "decimal(5,0)", "mgr", "decimal(5,0)"), ("photo", "string", "photo", "string"), ("job", "string", "job", "string"), ("deptno", "decimal(3,0)", "deptno", "decimal(3,0)"), ("ssn", "decimal(9,0)", "ssn", "decimal(9,0)"), ("sal", "decimal(7,2)", "sal", "decimal(7,2)")], transformation_ctx = "applymapping1" ) datasink2 = glueContext.write_dynamic_frame.from_options( frame = applymapping1, connection_type = "s3", connection_options = {"path": "s3://hr/employees"}, format = "csv", transformation_ctx = "datasink2" ) job.commit()