翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。
データスキャンの量を減らす
まず、必要なデータのみをロードすることを検討してください。各データソースの Spark クラスターにロードされるデータの量を減らすだけで、パフォーマンスを向上させることができます。このアプローチが適切かどうかを評価するには、次のメトリクスを使用します。
Spark UI セクションで説明されているように、CloudWatchメトリクスの Amazon S3 からの読み取りバイト数と Spark UI の詳細を確認できます。
CloudWatch メトリクス
Amazon S3 からのおおよその読み込みサイズは、ETLデータ移動 (バイト) で確認できます。このメトリクスは、前のレポート以降のすべてのエグゼキュターによって Amazon S3 から読み取られたバイト数を示します。これを使用して Amazon S3 からのETLデータ移動をモニタリングし、読み取りと外部データソースからの取り込みレートを比較できます。
S3 バイト読み取りデータポイントが予想よりも大きい場合は、次の解決策を検討してください。
Spark UI
for Spark UI AWS Glue のステージタブには、入力サイズと出力サイズが表示されます。次の例では、ステージ 2 は 47.4 GiB の入力と 47.7 GiB の出力を読み取り、ステージ 5 は 61.2 MiB の入力と 56.6 MiB の出力を読み取ります。
AWS Glue ジョブで Spark SQLまたは DataFrame アプローチを使用すると、SQL/D ataFrame タブにこれらのステージに関する統計情報が表示されます。この場合、ステージ 2 には、読み取りファイル数: 430、読み取りファイルサイズ: 47.4 GiB、出力行数: 160,796,570 が表示されます。
読み込むデータと使用しているデータのサイズに大きな違いがあることがわかった場合は、次のソリューションを試してください。
Amazon S3
Amazon S3 から読み取るときにジョブにロードされるデータの量を減らすには、データセット のファイルサイズ、圧縮、ファイル形式、ファイルレイアウト (パーティション) を考慮してください。Spark ジョブ AWS Glue の は、ETL多くの場合 raw データの に使用されますが、効率的な分散処理のためには、データソース形式の機能を検査する必要があります。
-
ファイルサイズ – 入力と出力のファイルサイズは、中程度の範囲 (128 MB など) にしておくことをお勧めします。ファイルが小さすぎたり、大きすぎたりすると、問題が発生する可能性があります。
小さなファイルが多数あると、次の問題が発生します。
-
多くのオブジェクト (同じ量のデータを保存する少数のオブジェクトと比較して
Head
) に対してリクエストを行うために必要なオーバーヘッド (List
、Get
、または など) により、Amazon S3 でのネットワーク I/O 負荷が高くなります。 -
Spark ドライバーの負荷の高い I/O と処理負荷。これにより、多くのパーティションとタスクが生成され、過剰な並列処理が発生します。
一方、ファイルタイプが分割可能でなく (gzip など)、ファイルが大きすぎる場合、Spark アプリケーションは 1 つのタスクでファイル全体の読み取りが完了するまで待機する必要があります。
小さなファイルごとに Apache Spark タスクを作成するときに発生する過剰な並列処理を減らすには、 のファイルグループ DynamicFramesを使用します。このアプローチにより、Spark ドライバーからOOM例外が発生する可能性が低くなります。ファイルグループを設定するには、
groupFiles
およびgroupSize
パラメータを設定します。次のコード例では、 AWS Glue DynamicFrame APIこれらのパラメータを持つ ETLスクリプトで を使用します。dyf = glueContext.create_dynamic_frame_from_options("s3", {'paths': ["s3://input-s3-path/"], 'recurse':True, 'groupFiles': 'inPartition', 'groupSize': '1048576'}, format="json")
-
-
圧縮 — S3 オブジェクトが数百メガバイト内にある場合は、圧縮することを検討してください。圧縮形式にはさまざまな種類があり、大まかに 2 つのタイプに分類できます。
-
gzip などの分割不可能な 圧縮形式では、ファイル全体が 1 人のワーカーによって解凍される必要があります。
-
bzip2 や LZO (インデックス付き) などの分割可能な圧縮形式を使用すると、ファイルを部分的に解凍でき、並列化できます。
Spark (およびその他の一般的な分散処理エンジン) では、ソースデータファイルをエンジンが並行して処理できるチャンクに分割します。これらの単位は、分割 と呼ばれることがよくあります。データが分割可能な形式になると、最適化された AWS Glue リーダーは、特定のブロックのみを取得する
Range
オプションGetObject
APIを に提供することで、S3 オブジェクトから分割を取得できます。次の図を参照して、これが実際にどのように機能するかを確認してください。圧縮されたデータは、ファイルが最適なサイズであるか、ファイルが分割可能である限り、アプリケーションを大幅に高速化できます。データサイズが小さいほど、Amazon S3 からスキャンされるデータと、Amazon S3 から Spark クラスターへのネットワークトラフィックが減少します。一方、データを圧縮および解凍するには、さらに多くの CPUが必要です。必要なコンピューティングの量は、圧縮アルゴリズムの圧縮率でスケーリングされます。分割可能な圧縮形式を選択するときは、このトレードオフを考慮してください。
注記
gzip ファイルは一般に分割できませんが、gzip を使用して個々の Parquet ブロックを圧縮し、それらのブロックを並列化できます。
-
-
ファイル形式 – 列指向形式を使用します。Apache Parquet
と Apache ORC は一般的な列指向データ形式です。列ベースの圧縮、データ型に基づく各列のエンコードと圧縮を採用することで、データを効率的に解析してORC保存します。Parquet エンコーディングの詳細については、「Parquet エンコーディング定義 」を参照してください。Parquet ファイルも分割可能です。 Columnar は、値を列ごとにグループ化し、ブロックにまとめて保存します。列指向形式を使用する場合、使用する予定のない列に対応するデータのブロックをスキップできます。Spark アプリケーションは、必要な列のみを取得できます。一般的に、圧縮率が高いか、データブロックをスキップするとAmazon S3から読み取るバイト数が少なくなり、パフォーマンスが向上します。どちらの形式も、I/O を減らすための以下のプッシュダウンアプローチをサポートしています。
-
射影プッシュダウン — 射影プッシュダウンは、アプリケーションで指定された列のみを取得する手法です。次の例に示すように、Spark アプリケーションで列を指定します。
-
DataFrame 例:
df.select("star_rating")
-
Spark SQLの例:
spark.sql("select start_rating from <table>")
-
-
述語のプッシュダウン — 述語のプッシュダウンは、
WHERE
およびGROUP BY
句を効率的に処理するための手法です。どちらの形式にも、列の値を表すデータのブロックがあります。各ブロックは、最大値や最小値など、ブロックの統計を保持します。Spark は、これらの統計を使用して、アプリケーションで使用されるフィルター値に応じてブロックを読み取るかスキップするかを決定できます。この機能を使用するには、次の例に示すように、条件にフィルターを追加します。-
DataFrame 例:
df.select("star_rating").filter("star_rating < 2")
-
Spark SQLの例:
spark.sql("select * from <table> where star_rating < 2")
-
-
-
ファイルレイアウト – S3 データを、データの使用方法に基づいて異なるパスのオブジェクトに保存することで、関連するデータを効率的に取得できます。詳細については、Amazon S3ドキュメント」の「プレフィックスを使用してオブジェクトを整理する」を参照してください。 AWS Glue は、キーと値を 形式で Amazon S3 プレフィックスに保存し
key=value
、Amazon S3 パスでデータをパーティション化することをサポートしています。データをパーティション化することで、各ダウンストリームの分析アプリケーションでスキャンされるデータ量を制限し、パフォーマンスを向上させ、コストを削減できます。詳細については、「 でのETL出力のパーティションの管理 AWS Glue」を参照してください。パーティション分割はテーブルをさまざまな部分に分割し、次の例に示すように、年 、月 、日 などの列値に基づいて関連するデータをグループ化されたファイルに保持します。
# Partitioning by /YYYY/MM/DD s3://<YourBucket>/year=2023/month=03/day=31/0000.gz s3://<YourBucket>/year=2023/month=03/day=01/0000.gz s3://<YourBucket>/year=2023/month=03/day=02/0000.gz s3://<YourBucket>/year=2023/month=03/day=03/0000.gz ...
データセットのパーティションは、 のテーブルでモデル化することで定義できます AWS Glue Data Catalog。その後、次のようにパーティションプルーニングを使用してデータスキャンの量を制限できます。
-
には AWS Glue DynamicFrame、
push_down_predicate
(またはcatalogPartitionPredicate
) を設定します。dyf = Glue_context.create_dynamic_frame.from_catalog( database=src_database_name, table_name=src_table_name, push_down_predicate = "year='2023' and month ='03'", )
-
Spark の場合 DataFrame、パーティションをプルーニングする固定パスを設定します。
df = spark.read.format("json").load("s3://<YourBucket>/year=2023/month=03/*/*.gz")
-
Spark ではSQL、データカタログからパーティションをプルーニングする where 句を設定できます。
df = spark.sql("SELECT * FROM <Table> WHERE year= '2023' and month = '03'")
-
を使用してデータを書き込むときに日付でパーティション化するには AWS Glue、次のように列partitionKeysの日付情報 DataFrame を使用して DynamicFrame または partitionBy()
を に設定します。 -
DynamicFrame
glue_context.write_dynamic_frame_from_options( frame= dyf, connection_type='s3',format='parquet' connection_options= { 'partitionKeys': ["year", "month", "day"], 'path': 's3://<YourBucket>/<Prefix>/' } )
-
DataFrame
df.write.mode('append')\ .partitionBy('year','month','day')\ .parquet('s3://<YourBucket>/<Prefix>/')
これにより、出力データのコンシューマーのパフォーマンスを向上させることができます。
入力データセットを作成するパイプラインを変更するためのアクセス権がない場合、パーティショニングはオプションではありません。代わりに、glob パターンを使用して不要な S3 パスを除外できます。で読み取るときに除外を設定します DynamicFrame。例えば、次のコードでは、2023 年の 01~09 か月の日数を除外しています。
dyf = glueContext.create_dynamic_frame.from_catalog( database=db, table_name=table, additional_options = { "exclusions":"[\"**year=2023/month=0[1-9]/**\"]" }, transformation_ctx='dyf' )
データカタログのテーブルプロパティで除外を設定することもできます。
-
キー:
exclusions
-
値:
["**year=2023/month=0[1-9]/**"]
-
-
-
Amazon S3 パーティションが多すぎる – 数千の値を含む ID 列など、幅広い値を含む列で Amazon S3 データをパーティション化することは避けてください。これにより、可能なパーティションの数はパーティション分割したすべてのフィールドの積であるため、バケット内のパーティションの数が大幅に増加する可能性があります。パーティションが多すぎると、以下が発生する可能性があります。
-
Data Catalog からパーティションメタデータを取得する際のレイテンシーの増加
-
Amazon S3 APIリクエスト (、
List
、Get
およびHead
) をさらに必要とする小さなファイルの数の増加
例えば、
partitionBy
または で日付タイプを設定するとpartitionKeys
、 などの日付レベルのパーティショニングyyyy/mm/dd
は多くのユースケースに適しています。ただし、 は多数のパーティションを生成するため、パフォーマンス全体に悪影響を与えるyyyy/mm/dd/<ID>
可能性があります。一方、リアルタイム処理アプリケーションなどの一部のユースケースでは、 などの多くのパーティションが必要です
yyyy/mm/dd/hh
。ユースケースでかなりのパーティションが必要な場合は、AWS Glue パーティションインデックスを使用して Data Catalog からパーティションメタデータを取得する際のレイテンシーを短縮することを検討してください。 -
データベースと JDBC
データベースから情報を取得するときにデータスキャンを減らすには、SQLクエリで述where
語 (または 句) を指定します。SQL インターフェイスを提供しないデータベースは、クエリまたはフィルタリングのための独自のメカニズムを提供します。
Java Database Connectivity (JDBC) 接続を使用する場合は、次のパラメータの where
句を含む Select クエリを指定します。
-
には DynamicFrame、 sampleQueryオプションを使用します。を使用する場合は
create_dynamic_frame.from_catalog
、引additional_options
数を次のように設定します。query = "SELECT * FROM <TableName> where id = 'XX' AND" datasource0 = glueContext.create_dynamic_frame.from_catalog( database = db, table_name = table, additional_options={ "sampleQuery": query, "hashexpression": key, "hashpartitions": 10, "enablePartitioningForSampleQuery": True }, transformation_ctx = "datasource0" )
の場合
using create_dynamic_frame.from_options
、引connection_options
数を次のように設定します。query = "SELECT * FROM <TableName> where id = 'XX' AND" datasource0 = glueContext.create_dynamic_frame.from_options( connection_type = connection, connection_options={ "url": url, "user": user, "password": password, "dbtable": table, "sampleQuery": query, "hashexpression": key, "hashpartitions": 10, "enablePartitioningForSampleQuery": True } )
-
には DataFrame、 クエリ
オプションを使用します。 query = "SELECT * FROM <TableName> where id = 'XX'" jdbcDF = spark.read \ .format('jdbc') \ .option('url', url) \ .option('user', user) \ .option('password', pwd) \ .option('query', query) \ .load()
-
Amazon Redshift では、 AWS Glue 4.0 以降を使用して Amazon Redshift Spark コネクタ のプッシュダウンサポートを活用します。
dyf = glueContext.create_dynamic_frame.from_catalog( database = "redshift-dc-database-name", table_name = "redshift-table-name", redshift_tmp_dir = args["temp-s3-dir"], additional_options = {"aws_iam_role": "arn:aws:iam::role-account-id:role/rs-role-name"} )
-
他のデータベースについては、そのデータベースのドキュメントを参照してください。