AWS Step Functions を使用して ETL パイプラインを検証、変換、パーティショニングでオーケストレーションします - AWS 規範ガイダンス

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

AWS Step Functions を使用して ETL パイプラインを検証、変換、パーティショニングでオーケストレーションします

サンディップ・ガンガパディヤイ (AWS) によって作成されました

コードリポジトリ: aws-step-functions-etl-pipeline-pattern

環境:本稼働

テクノロジー: 分析、ビッグデータ、データレイク DevOps、サーバーレス

AWS サービス: Amazon Athena、AWS Glue、AWS Lambda、AWS Step Functions

[概要]

このパターンは、パフォーマンスとコストを最適化するために大規模な CSV データセットを検証、変換、圧縮、およびパーティション化するサーバーレスの抽出、変換、ロード (ETL) パイプラインを構築する方法を示しています。パイプラインは AWS Step Functions によってオーケストレートされ、自動再試行、エラー処理、ユーザー通知特徴量が含まれています。

CSV ファイルが Amazon Simple Storage Service (Amazon S3) バケットの出典フォルダにアップロードされると、ETL パイプラインが実行を開始します。パイプラインは、ソース CSV ファイルの内容とスキーマを検証し、CSV ファイルを圧縮された Apache Parquet 形式に変換し、データセットを年、月、日ごとにパーティション化し、分析ツールが処理できるように別のフォルダに保存します。

このパターンを自動化するコードは GitHub、AWS Step Functions リポジトリの ETL Pipeline で使用できます。

前提条件と制限

前提条件

  • アクティブなAWS アカウント

  • AWS コマンドラインインターフェイス (AWS CLI) が AWS アカウントでインストールおよび設定されているため、AWS CloudFormation スタックをデプロイして AWS リソースを作成できます。AWS CLI バージョン 2 が推奨されます。(AWS CLI のドキュメントのAWS CLI のインストール、更新とアンインストールを参照してください)。AWS CLI の設定手順については、AWS CLI ドキュメントの 設定と認証情報ファイルの設定 を参照してください。

  • Amazon S3 バケット。

  • 正しいスキーマの CSV データセット。(このパターンに含まれる コードリポジトリ には、使用できる正しいスキーマとデータ型が記載されたサンプル CSV ファイルが用意されています)。

  • AWS マネジメントコンソールでの使用がサポートされているウェブブラウザ。 でサポートされるブラウザのリストについては、を参照してください。

  • AWS Glue; コンソールへのアクセス。

  • AWS Step Functions コンソールにアクセスします。

制約事項

製品バージョン

  • AWS Lambda 用の Python 3.11

  • AWS Glue バージョン 2.0

アーキテクチャ

S3 ソースバケットから Step Functions、AWS Glue、Amazon SNS を 10 ステップで経由する ETL プロセス。

この図に示されているワークフローは、以下の大まかなステップで構成されています。

  1. ユーザーは CSV ファイルを Amazon S3 のソースフォルダにアップロードします。

  2. Amazon S3 通知イベントは、ステップファンクションステートマシンを起動する AWS Lambda 関数を開始します。

  3. Lambda 関数は、未加工の CSV ファイルのスキーマとデータ型を検証します。

  4. 検証結果に応じて:

    1. ソースファイルの検証が成功すると、ファイルはステージフォルダーに移動してさらに処理されます。

    2. 検証に失敗すると、ファイルはエラーフォルダに移動し、Amazon Simple Notification Service (Amazon SNS) を通じてエラー通知が送信されます。

  5. AWS Glue クローラーは Amazon S3 のステージフォルダから未加工ファイルのスキーマを作成します。

  6. AWS Glue ジョブは、未加工ファイルを Parquet 形式に変換、圧縮、およびパーティション化します。

  7. また、AWS Glue ジョブはファイルを Amazon S3 のトランスフォームフォルダに移動します。

  8. AWS Glue クローラーは、変換されたファイルからスキーマを作成します。生成されたスキーマは、どの分析ジョブでも使用できます。 を使用して、Amazon Athena でクエリを実行することができます。

  9. パイプラインがエラーなしで完了すると、スキーマファイルはアーカイブフォルダーに移動されます。エラーが発生した場合、ファイルは代わりにエラーフォルダーに移動されます。

  10. Amazon SNS は、パイプラインの完了ステータスに基づいて成功または失敗を示す通知を送信します。

このパターンで使用されるすべての AWS リソースはサーバーレスです。管理するサーバーはありません。

ツール

AWS サービス

  • AWS Glue — AWS Glue は、分析のためにデータを簡単に準備してロードできるフルマネージドのETLサービスです。

  • AWS Step Functionsは、AWS Lambda関数と他のサービスを組み合わせてビジネスクリティカルなアプリケーションを構築できるサーバーレスオーケストレーションサービスです。AWS Step Functions のグラフィカルコンソールでは、アプリケーションのワークフローを一連のイベント駆動型ステップとして確認できます。

  • Amazon S3 – Amazon Simple Storage Service (Amazon S3) は、業界をリードするスケーラビリティ、データ可用性、セキュリティ、およびパフォーマンスを提供するオブジェクトストレージサービスです。

  • Amazon SNS — Amazon Simple Notification Service (Amazon SNS) は、マイクロサービス、分散システム、サーバーレスアプリケーションを分離できる、可用性が高く、耐久性があり、安全な、完全マネージド型のパブ/サブメッセージングサービスです。

  • AWS Lambda – AWS Lambda はサーバーをプロビジョニングしたり管理しなくてもコードを実行できるコンピューティングサービスです。AWS Lambda は必要に応じてコードを実行し、1 日あたり数個のリクエストから 1 秒あたり数千のリクエストまで自動的にスケーリングします。

コード

このパターンのコードは GitHub、AWS Step Functions リポジトリの ETL Pipeline で入手できます。コードリポジトリには以下のファイルとフォルダが含まれています。

  • template.yml – AWS Step Functions で ETL パイプラインを作成するための AWS CloudFormation テンプレート。

  • parameter.json — すべてのパラメータとパラメータ値が含まれます。エピック セクションで説明されているように、このファイルを更新してパラメータ値を変更します。

  • myLayer/python フォルダー — このプロジェクトに必要な AWS Lambda レイヤーを作成するために必要な Python パッケージが含まれています。

  • lambda フォルダー — 次の Lambda 関数が含まれます。

    • move_file.py — ソースデータセットをアーカイブフォルダー、トランスフォームフォルダー、またはエラーフォルダーに移動します。

    • check_crawler.py — 失敗メッセージを送信する前に、RETRYLIMIT  環境変数で設定された回数だけ AWS Glue クローラーのステータスを確認します。

    • start_crawler.py — AWS Glue クローラーを起動します。

    • start_step_function.py — AWS Step Functions を開始します。

    • start_codebuild.py – AWS CodeBuild プロジェクトを開始します。

    • validation.py — 入力された未加工データセットを検証します。

    • s3object.py — S3 バケット内に必要なディレクトリ構造を作成します。

    • notification.py — パイプラインの最後に成功通知またはエラー通知を送信します。

これらのファイルを使用するには、エピックセクションの指示に従ってください。

エピック

タスク説明必要なスキル

AWS SAM コードリポジトリを複製します。

  1. AWS Step Functions リポジトリで ETL パイプライン を開きます。

  2. メインリポジトリページのファイルリストの上にある コード を選択し、Clone with HTTPS の下にある URL をコピーします。

  3. コマンドラインインターフェースで、作業ディレクトリをサンプルファイルを保存する場所に変更します。

  4. ターミナルまたはコマンドラインプロンプトで、 コマンドを実行します。

    git clone <repoURL>

    where はステップ 2 でコピーした URL <repoURL> を指します。

開発者

パラメータ値を更新します。

リポジトリのローカルコピーで、parameter.json ファイルを編集し、以下のようにデフォルトのパラメータ値を更新します。

  • pS3BucketName — データセットを保存する S3 バケットの名前。このバケットはテンプレートによって自動的に作成されます。バケット名はグローバルに一意である必要があります。

  • pSourceFolder — ソース CSV ファイルのアップロードに使用される S3 バケット内のフォルダの名前。

  • pStageFolder — 処理中にステージング領域として使用される S3 バケット内のフォルダの名前。

  • pTransformFolder — 変換および分割されたデータセットの保存に使用される S3 バケット内のフォルダの名前。

  • pErrorFolder — 検証できない場合にソース CSV ファイルの移動先となる S3 バケット内のフォルダー。

  • pArchiveFolder  — ソース CSV ファイルのアップロードに使用される S3 バケット内のフォルダの名前。

  • pEmailforNotification — 成功/エラー通知を受信するための有効な E メールアドレス。

  • pPrefix ─ AWS Glue クローラー名で使用されるプレフィックス文字列。

  • pDatasetSchema — ソースファイルの検証対象となるデータセットスキーマ。Cerberus Python パッケージは、ソースデータセットの検証に使用されます。詳細については、Cerberus のウェブサイトを参照してください。

開発者

S3 バケットにソースコードをアップロードする。

ETL パイプラインを自動化する CloudFormation テンプレートをデプロイする前に、 CloudFormation テンプレートのソースファイルをパッケージ化し、S3 バケットにアップロードする必要があります。そのためには、設定済みのプロファイルで以下の AWS CLI コマンドを次のように実行します。

aws cloudformation package --template-file template.yml --s3-bucket <bucket_name> --output-template-file packaged.template --profile <profile_name>

各パラメータの意味は次のとおりです。

  • <bucket_name> は、スタックをデプロイする AWS リージョンにある既存の S3 バケット名です。このバケットは、 CloudFormation テンプレートのソースコードパッケージを保存するために使用されます。

  • <profile_name> は、AWS CLI をセットアップしたときに事前設定した有効な AWS CLI プロファイルです。

開発者
タスク説明必要なスキル

CloudFormation テンプレートをデプロイします。

CloudFormation テンプレートをデプロイするには、次の AWS CLI コマンドを実行します。

aws cloudformation deploy --stack-name <stack_name> --template-file packaged.template --parameter-overrides file://parameter.json --capabilities CAPABILITY_IAM --profile <profile_name>

各パラメータの意味は次のとおりです。

  • <stack_name> はスタックの一意の識別子です CloudFormation 。

  • <profile-name> は事前設定された AWS CLI プロファイルです。

開発者

進捗確認。

AWS CloudFormation コンソール で、スタック開発の進行状況を確認します。ステータスが CREATE_COMPLETE の場合、スタックは正常にデプロイされています。

開発者

AWS Glue データベース名を書き留めておきます。

スタックの Outputs タブには、AWS Glue データベースの名前が表示されます。キー名は、GlueDBOutput です。

開発者
タスク説明必要なスキル

ETL パイプラインを開始します。

  1. S3 source バケット内のソースフォルダ (または parameter.json ファイルに設定したフォルダ名) に移動します。

  2. サンプル CSV ファイルをこのフォルダにアップロードします。(コードリポジトリには、使用できる Sample_Bank_Transaction_Raw_Dataset.csv というサンプルファイルが用意されています)。ファイルをアップロードすると、Step Functions を通じて ETL パイプラインが開始されます。

  3. Step Functions コンソール で ETL パイプラインのステータスを確認します。

開発者

パーティション化されたデータセットを確認します。

ETL パイプラインが完了したら、パーティション化されたデータセットが Amazon S3 トランスフォームフォルダ (transform または parameter.json ファイルに設定したフォルダ名) にあることを確認します。

開発者

パーティション化された AWS Glue データベースを確認します。

  1. AWS Glue コンソール で、スタックによって作成された AWS Glue データベース (これは前のエピックでメモしたデータベースです) を選択します。

  2. AWS Glue データカタログでパーティションテーブルが使用できることを確認します。

開発者

クエリを実行する。

(オプション) Amazon Athena を使用して、パーティション分割され変換されたデータベースでアドホッククエリを実行します。手順については、AWS ドキュメントの Amazon Athena を使用した SQL クエリの実行 を参照してください。

データベースアナリスト

トラブルシューティング

問題ソリューション

AWS Glue ジョブとクローラーの AWS Identity and Access Management (IAM) アクセス許可 AWS Glue

AWS Glue ジョブまたはクローラをさらにカスタマイズする場合は、AWS Glue ジョブで使用される IAM ロールで適切な IAM アクセス許可を付与するか、AWS Lake Formation にデータアクセス許可を付与してください。詳細については、AWS ドキュメントを参照してください。

関連リソース

AWS のドキュメント

追加情報

次の図は、Step Functions Inspectorパネル からの ETL パイプラインを成功させるための AWS Step Functions ワークフローを示しています。

入力 .csv を検証し、データをクローリングし、AWS Glue ジョブを実行するための Step Functions ワークフロー。

以下の図は、入力検証エラーが原因で失敗した ETL パイプラインの AWS Step Functions ワークフローを、Step Functions Inspector パネルから示しています。

Step Functions ワークフローが失敗し、ファイルがエラーフォルダに移動しました。