Amazon SageMaker Feature Store Spark でのバッチ取り込み - Amazon SageMaker

翻訳は機械翻訳により提供されています。提供された翻訳内容と英語版の間で齟齬、不一致または矛盾がある場合、英語版が優先します。

Amazon SageMaker Feature Store Spark でのバッチ取り込み

Amazon SageMaker Feature Store Spark は、Spark ライブラリを Feature Store に接続する Spark コネクタです。Feature Store Spark は、Spark DataFrame から特徴量グループへのデータインジェストを簡素化します。Feature Store は、既存のETLパイプライン、Amazon 、、GIS AWS Glue ジョブEMR、Amazon SageMaker Processing ジョブ、ノートブックを使用した Spark でのバッチデータの取り込みをサポートします SageMaker 。

Python と Scala デベロッパー向けに、バッチデータインジェストをインストールして実装するためのメソッドが用意されています。Python デベロッパーは、Amazon SageMaker Feature Store Spark GitHub リポジトリ の手順に従って、オープンソースsagemaker-feature-store-pysparkの Python ライブラリをローカル開発、Amazon へのインストールEMR、Jupyter Notebooks に使用できます。Scala デベロッパーは、Amazon Feature Store Spark SDK Maven 中央リポジトリ で利用可能な SageMaker Feature Store Spark コネクタを使用できます。

Spark コネクタを使用して、オンラインストア、オフラインストア、あるいはその両方が有効になっているかどうかに応じて、次の方法でデータを取り込むことができます。

  1. デフォルトで取り込み — オンラインストアが有効になっている場合、Spark コネクタはまず を使用してデータフレームをオンラインストアに取り込みますPutRecordAPI。イベント時間が最も長いレコードのみがオンラインストアに残ります。オフラインストアが有効になっている場合、Feature Store は 15 分以内にデータフレームをオフラインストアに取り込みます。オンラインストアとオフラインストアの仕組みの詳細については、「Feature Store の概念」を参照してください。

    これを実行するには .ingest_data(...) メソッドで target_stores を指定しないようにします。

  2. オフラインストアに直接取り込む — オフラインストアが有効になっている場合、Spark コネクタはデータフレームをオフラインストアに直接バッチ取り込みします。データフレームをオフラインストアに直接取り込んでも、オンラインストアは更新されません。

    これを実行するには .ingest_data(...) メソッドで target_stores=["OfflineStore"] を指定します。

  3. オンラインストアのみ – オンラインストアが有効になっている場合、Spark コネクタは を使用してデータフレームをオンラインストアに取り込みますPutRecordAPI。データフレームをオンラインストアに直接取り込んでも、オフラインストアは更新されません。

    これを実行するには .ingest_data(...) メソッドで target_stores=["OnlineStore"] を指定します。

さまざまな取り込み方法の詳細については、「実装例」を参照してください。

Feature Store Spark のインストール

Scala ユーザー

Feature Store Spark SDKは、Scala ユーザーの Amazon SageMaker Feature Store Spark SDK Maven 中央リポジトリで使用できます。

要件

  • Spark >= 3.0.0 と <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (Amazon を使用している場合のみEMR)

依存関係を POM.xml で宣言する

Feature Store Spark コネクタは iceberg-spark-runtime ライブラリに依存しています。そのため、Iceberg テーブル形式で自動作成した特徴量グループにデータを取り込む場合は、iceberg-spark-runtime ライブラリの対応するバージョンを依存関係に追加する必要があります。例えば、Spark 3.1 を使用している場合は、プロジェクトの POM.xml で以下を宣言する必要があります。

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python ユーザー

Feature Store Spark SDKは、オープンソースの Amazon SageMaker Feature Store Spark GitHub リポジトリ で使用できます。

要件

  • Spark >= 3.0.0 と <= 3.3.0

  • Amazon EMR >= 6.1.0 (Amazon を使用している場合のみEMR)

  • カーネル = conda_python3

Spark がインストールされているディレクトリに $SPARK_HOME を設定することをお勧めします。インストール中、Feature Store は必要な JARを にアップロードするためSPARK_HOME、依存関係は自動的にロードされます。この PySpark ライブラリを機能させるには、Spark の開始 JVM が必要です。

ローカルインストール

インストールの詳細情報を確認するには、次のインストールコマンドに --verbose を追加して詳細モード有効にします。

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Amazon へのインストール EMR

リリースバージョン 6.1.0 以降の Amazon EMRクラスターを作成します。SSH を使用すると、問題のトラブルシューティングに役立ちます。

次のどちらかを実行するとライブラリをインストールできます。

  • Amazon 内でカスタムステップを作成しますEMR。

  • を使用してクラスターに接続SSHし、そこからライブラリをインストールします。

注記

以下の情報は Spark バージョン 3.1 を使用していますが、要件を満たす任意のバージョンを指定できます。

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
注記

依存関係を SPARK_ JARsに自動的にインストールする場合はHOME、ブートストラップステップを使用しないでください。

SageMaker ノートブックインスタンスへのインストール

次のコマンドを使用して、Spark コネクタと PySpark 互換性のある のバージョンをインストールします。

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

オフラインストアへのバッチ取り込みを実行する場合、ノートブックインスタンス環境内には依存関係はありません。

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

を使用したノートブックへのインストール GIS

重要

AWS Glue バージョン 2.0 以降を使用する必要があります。

インタラクティブセッション () に PySpark コネクタをインストールするには、 AWS Glue 次の情報を使用しますGIS。

Amazon SageMaker Feature Store Spark では、セッションの初期化JAR中に特定の Spark コネクタを Amazon S3 バケットにアップロードする必要があります。S3 バケットJARに必要な のアップロードの詳細については、「」を参照してくださいFeature Store Spark JARの の取得

をアップロードしたらJAR、次のコマンドJARを使用して GIS セッションに を指定する必要があります。

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

AWS Glue ランタイムに Feature Store Spark をインストールするには、 で指定したモジュールpipに対して GIS notebook. AWS Glue runs 内の%additional_python_modulesマジックコマンドを使用します%additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

AWS Glue セッションを開始する前に、前述のマジックコマンドの両方を使用する必要があります。

AWS Glue ジョブへのインストール

重要

AWS Glue バージョン 2.0 以降を使用する必要があります。

AWS Glue ジョブに Spark コネクタをインストールするには、次の例に示すように、 --extra-jars引数を使用して必要な を指定JARsし、ジョブを作成するときに Spark Connector を AWS Glue ジョブパラメータとして--additional-python-modulesインストールします。S3 バケットJARに必要な のアップロードの詳細については、「」を参照してくださいFeature Store Spark JARの の取得

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Amazon SageMaker Processing ジョブへのインストール

Amazon SageMaker Processing ジョブで Feature Store Spark を使用するには、独自のイメージを作成します。独自のイメージの持ち込みの詳細については、「独自の SageMaker イメージを持参する」を参照してください。Dockerfile インストールステップを追加します。Docker イメージを Amazon ECRリポジトリにプッシュしたら、 PySparkProcessor を使用して処理ジョブを作成できます。 PySpark プロセッサを使用した処理ジョブの作成の詳細については、「」を参照してくださいApache Spark で処理ジョブを実行する

以下は、Dockerfile にインストールステップを追加する例です。

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Feature Store Spark JARの の取得

Feature Store Spark 依存関係 を取得するにはJAR、ネットワークアクセスを持つpip任意の Python 環境で を使用して、Python パッケージインデックス (PyPI) リポジトリから Spark コネクタをインストールする必要があります。 SageMaker Jupyter Notebook は、ネットワークアクセスを持つ Python 環境の例です。

以下のコマンドでは Spark コネクタをインストールします。

!pip install sagemaker-feature-store-pyspark-3.1

Feature Store Spark をインストールしたら、JAR場所を取得し、Amazon S3 JARに をアップロードできます。

feature-store-pyspark-dependency-jars コマンドは、Feature Store Spark が追加JARした必要な依存関係の場所を提供します。コマンドを使用して を取得しJAR、Amazon S3 にアップロードできます。

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

実装例

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

サンプル Python スクリプトを使用して Spark ジョブを送信する

この PySpark バージョンでは、追加の依存関係をインポートJARする必要があるため、Spark アプリケーションを実行するには追加のステップが必要です。

インストールSPARK_HOME時に を指定しなかった場合、 の実行JARsJVM時に に必要な をロードする必要がありますspark-submitfeature-store-pyspark-dependency-jarsは、Spark ライブラリによってインストールされた Python スクリプトであり、すべての へのパスを自動的に取得JARsします。

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Amazon でこのアプリケーションを実行している場合はEMR、依存関係を他のタスクノードJARsに分散する必要がないように、クライアントモードでアプリケーションを実行することをお勧めします。次のような Spark 引数を使用して Amazon EMRクラスターにもう 1 つのステップを追加します。

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Spark ジョブの送信

Scala

Feature Store Spark は通常の依存関係として使用できるはずです。すべてのプラットフォームでアプリケーションを実行するために、追加の指示は必要ありません。