Konsumsi batch dengan Amazon SageMaker Feature Store Spark - Amazon SageMaker AI

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

Konsumsi batch dengan Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark adalah konektor Spark yang menghubungkan perpustakaan Spark ke Feature Store. Feature Store Spark menyederhanakan konsumsi data dari Spark DataFrame ke grup fitur. Feature Store mendukung konsumsi data batch dengan Spark, menggunakan pipeline ETL yang ada, di Amazon EMR, GIS, pekerjaan, pekerjaan Amazon SageMaker Processing AWS Glue , atau notebook. SageMaker

Metode untuk menginstal dan mengimplementasikan konsumsi data batch disediakan untuk pengembang Python dan Scala. Pengembang Python dapat menggunakan pustaka sagemaker-feature-store-pyspark Python open-source untuk pengembangan lokal, instalasi di Amazon EMR, dan untuk Notebook Jupyter dengan mengikuti petunjuk di repositori Amazon Feature Store Spark. SageMaker GitHub Pengembang scala dapat menggunakan konektor Feature Store Spark yang tersedia di repositori pusat Amazon SageMaker Feature Store Spark SDK Maven.

Anda dapat menggunakan konektor Spark untuk menyerap data dengan cara berikut, tergantung pada apakah toko online, toko offline, atau keduanya diaktifkan.

  1. Ingest secara default - Jika toko online diaktifkan, konektor Spark pertama-tama menyerap kerangka data Anda ke toko online menggunakan API. PutRecord Hanya catatan dengan waktu acara terbesar yang tersisa di toko online. Jika toko offline diaktifkan, dalam waktu 15 menit Toko Fitur menyerap kerangka data Anda ke toko offline. Untuk informasi selengkapnya tentang cara kerja toko online dan offline, lihatKonsep Toko Fitur.

    Anda dapat mencapai ini dengan tidak menentukan target_stores dalam metode. .ingest_data(...)

  2. Konsumsi langsung toko offline - Jika toko offline diaktifkan, batch konektor Spark menyerap kerangka data Anda langsung ke toko offline. Menelan kerangka data langsung ke toko offline tidak memperbarui toko online.

    Anda dapat mencapai ini dengan mengatur target_stores=["OfflineStore"] .ingest_data(...) metode.

  3. Hanya toko online - Jika toko online diaktifkan, konektor Spark menyerap kerangka data Anda ke toko online menggunakan API. PutRecord Menelan kerangka data langsung ke toko online tidak memperbarui toko offline.

    Anda dapat mencapai ini dengan mengatur target_stores=["OnlineStore"] .ingest_data(...) metode.

Untuk informasi tentang menggunakan metode konsumsi yang berbeda, lihat. Contoh implementasi

Instalasi Fitur Store Spark

Pengguna scala

Feature Store Spark SDK tersedia di repositori pusat Amazon SageMaker Feature Store Spark SDK Maven untuk pengguna Scala.

Persyaratan

  • Spark >= 3.0.0 dan <= 3.3.0

  • iceberg-spark-runtime>= 0.14.0

  • Skala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (hanya jika Anda menggunakan Amazon EMR)

Deklarasikan ketergantungan di POM.xml

Konektor Feature Store Spark memiliki ketergantungan pada perpustakaan. iceberg-spark-runtime Oleh karena itu, Anda harus menambahkan versi iceberg-spark-runtime pustaka yang sesuai ke dependensi jika Anda memasukkan data ke dalam grup fitur yang telah Anda buat secara otomatis dengan format tabel Iceberg. Misalnya, jika Anda menggunakan Spark 3.1, Anda harus mendeklarasikan hal berikut di proyek Anda: POM.xml

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Pengguna Python

Feature Store Spark SDK tersedia di repositori SageMaker Amazon Feature Store GitHub Spark sumber terbuka.

Persyaratan

  • Spark >= 3.0.0 dan <= 3.3.0

  • Amazon EMR >= 6.1.0 (hanya jika Anda menggunakan Amazon EMR)

  • Kernel = conda_python3

Kami merekomendasikan pengaturan $SPARK_HOME ke direktori tempat Anda menginstal Spark. Selama instalasi, Feature Store mengunggah JAR yang diperlukan keSPARK_HOME, sehingga dependensi dimuat secara otomatis. Spark memulai JVM diperlukan untuk membuat perpustakaan ini PySpark berfungsi.

Instalasi lokal

Untuk menemukan info lebih lanjut tentang instalasi, aktifkan mode verbose dengan menambahkan --verbose ke perintah instalasi berikut.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Instalasi di Amazon EMR

Buat klaster EMR Amazon dengan versi rilis 6.1.0 atau yang lebih baru. Aktifkan SSH untuk membantu Anda memecahkan masalah apa pun.

Anda dapat melakukan salah satu hal berikut untuk menginstal perpustakaan:

  • Buat langkah khusus dalam Amazon EMR.

  • Connect ke cluster Anda menggunakan SSH dan instal perpustakaan dari sana.

catatan

Informasi berikut menggunakan Spark versi 3.1, tetapi Anda dapat menentukan versi apa pun yang memenuhi persyaratan.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
catatan

Jika Anda ingin menginstal dependen JARs secara otomatis ke SPARK_HOME, jangan gunakan langkah bootstrap.

Instalasi pada instance SageMaker notebook

Instal versi PySpark yang kompatibel dengan konektor Spark menggunakan perintah berikut:

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Jika Anda melakukan batch ingestion ke toko offline, dependensi tidak berada dalam lingkungan instance notebook.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Instalasi pada notebook dengan GIS

penting

Anda harus menggunakan AWS Glue versi 2.0 atau yang lebih baru.

Gunakan informasi berikut untuk membantu Anda menginstal PySpark konektor dalam Sesi AWS Glue Interaktif (GIS).

Amazon SageMaker Feature Store Spark memerlukan JAR konektor Spark tertentu selama inisialisasi sesi untuk diunggah ke bucket Amazon S3 Anda. Untuk informasi lebih lanjut tentang mengunggah JAR yang diperlukan ke bucket S3 Anda, lihat. Mengambil JAR untuk Feature Store Spark

Setelah Anda mengunggah JAR, Anda harus menyediakan sesi GIS dengan JAR menggunakan perintah berikut.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Untuk menginstal Feature Store Spark di AWS Glue runtime, gunakan perintah %additional_python_modules ajaib di dalam notebook GIS. AWS Glue berjalan pip ke modul yang telah Anda tentukan di bawah%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Sebelum Anda memulai AWS Glue sesi, Anda harus menggunakan kedua perintah sihir sebelumnya.

Instalasi pada suatu AWS Glue pekerjaan

penting

Anda harus menggunakan AWS Glue versi 2.0 atau yang lebih baru.

Untuk menginstal konektor Spark pada AWS Glue pekerjaan, gunakan --extra-jars argumen untuk menyediakan yang diperlukan JARs dan --additional-python-modules untuk menginstal Spark Connector sebagai parameter pekerjaan ketika Anda membuat AWS Glue pekerjaan seperti yang ditunjukkan pada contoh berikut. Untuk informasi lebih lanjut tentang mengunggah JAR yang diperlukan ke bucket S3 Anda, lihat. Mengambil JAR untuk Feature Store Spark

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Instalasi pada pekerjaan SageMaker Pemrosesan Amazon

Untuk menggunakan pekerjaan Feature Store Spark dengan Amazon SageMaker Processing, bawalah gambar Anda sendiri. Untuk informasi selengkapnya tentang membawa gambar Anda, lihatBawa gambar SageMaker AI Anda sendiri. Tambahkan langkah instalasi ke Dockerfile. Setelah mendorong image Docker ke repositori Amazon ECR, Anda dapat menggunakannya PySparkProcessor untuk membuat pekerjaan pemrosesan. Untuk informasi selengkapnya tentang membuat pekerjaan pemrosesan dengan PySpark prosesor, lihatJalankan Processing Job dengan Apache Spark.

Berikut ini adalah contoh menambahkan langkah instalasi ke Dockerfile.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Mengambil JAR untuk Feature Store Spark

Untuk mengambil JAR ketergantungan Feature Store Spark, Anda harus menginstal konektor Spark dari repositori Python Package Index (PyPI) menggunakan di lingkungan Python apa pun dengan akses jaringan. pip Notebook SageMaker Jupyter adalah contoh lingkungan Python dengan akses jaringan.

Perintah berikut menginstal konektor Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Setelah menginstal Feature Store Spark, Anda dapat mengambil lokasi JAR dan mengunggah JAR ke Amazon S3.

feature-store-pyspark-dependency-jarsPerintah menyediakan lokasi JAR ketergantungan yang diperlukan yang ditambahkan oleh Feature Store Spark. Anda dapat menggunakan perintah untuk mengambil JAR dan mengunggahnya ke Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Contoh implementasi

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Kirim pekerjaan Spark dengan contoh skrip Python

PySpark Versi ini membutuhkan JAR ekstra dependen untuk diimpor, jadi langkah tambahan diperlukan untuk menjalankan aplikasi Spark.

Jika Anda tidak menentukan SPARK_HOME selama instalasi, maka Anda harus memuat yang diperlukan JARs di JVM saat berjalan. spark-submit feature-store-pyspark-dependency-jarsadalah skrip Python yang diinstal oleh perpustakaan Spark untuk secara otomatis mengambil jalur ke semua untuk Anda. JARs

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Jika Anda menjalankan aplikasi ini di Amazon EMR, kami sarankan Anda menjalankan aplikasi dalam mode klien, sehingga Anda tidak perlu mendistribusikan dependen JARs ke node tugas lain. Tambahkan satu langkah lagi di cluster EMR Amazon dengan argumen Spark yang mirip dengan berikut ini:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Kirim pekerjaan Spark

Scala

Anda harus dapat menggunakan Feature Store Spark sebagai dependensi normal. Tidak diperlukan instruksi tambahan untuk menjalankan aplikasi di semua platform.