Inserimento in batch con Amazon SageMaker Feature Store Spark - Amazon SageMaker

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Inserimento in batch con Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark è un connettore Spark che collega la libreria Spark al Feature Store. Feature Store Spark semplifica l'inserimento dei dati da Spark DataFrame nei gruppi di funzionalità. Feature Store supporta l'inserimento di dati in batch con Spark, utilizzando la ETL pipeline esistente, su AmazonEMR, un processo GIS AWS Glue , un processo di Amazon SageMaker Processing o un notebook. SageMaker

I metodi per l'installazione e l'implementazione dell'inserimento di dati in batch sono disponibili per gli sviluppatori Python e Scala. Gli sviluppatori Python possono utilizzare la libreria sagemaker-feature-store-pyspark Python open source per lo sviluppo locale, l'installazione su EMR Amazon e per Jupyter Notebooks seguendo le istruzioni nel repository Amazon Feature Store Spark. SageMaker GitHub Gli sviluppatori di Scala possono utilizzare il connettore Feature Store Spark disponibile nell'archivio centrale Amazon SageMaker Feature Store Spark SDK Maven.

Puoi utilizzare il connettore Spark per importare dati nei seguenti modi, a seconda che l'archivio online, l'archivio offline o entrambi siano abilitati.

  1. Inserisci per impostazione predefinita: se il negozio online è abilitato, il connettore Spark inserisce innanzitutto il tuo dataframe nel negozio online utilizzando il. PutRecordAPI Nell'archivio online rimane solo il record con il maggior numero di eventi. Se l'archivio offline è abilitato, entro 15 minuti Feature Store inserisce il data frame nell'archivio offline. Per ulteriori informazioni sul funzionamento degli archivi online e offline, consulta Concetti di base sul Feature Store.

    È possibile eseguire questa operazione anche senza specificare target_stores nel metodo .ingest_data(...).

  2. Inserimento diretto in archivio offline: se l’archivio offline è abilitato, Spark Connector inserisce in batch il data frame direttamente nell’archivio offline. L'inserimento del data frame direttamente nell'archivio offline non aggiorna l'archivio online.

    È possibile eseguire questa operazione impostando target_stores=["OfflineStore"] nel metodo .ingest_data(...).

  3. Solo negozio online: se il negozio online è abilitato, il connettore Spark inserisce il tuo dataframe nel negozio online utilizzando il. PutRecordAPI L'inserimento del data frame direttamente nell'archivio online non aggiorna l'archivio offline.

    È possibile eseguire questa operazione impostando target_stores=["OnlineStore"] nel metodo .ingest_data(...).

Per ulteriori informazioni sui diversi metodi di inserimento, consulta Implementazioni esemplificative.

Installazione di Feature Store Spark

Utenti Scala

Il Feature Store Spark SDK è disponibile nell'archivio centrale Amazon SageMaker Feature Store Spark SDK Maven per gli utenti Scala.

Requisiti

  • Spark >= 3.0.0 e <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (solo se utilizzi Amazon) EMR

Dichiara la dipendenza in .xml POM

Il connettore Feature Store Spark dipende dalla libreria iceberg-spark-runtime. Devi quindi aggiungere la versione corrispondente della libreria iceberg-spark-runtime alla dipendenza se stai inserendo dati in un gruppo di funzionalità che hai creato automaticamente con il formato di tabella Iceberg. Ad esempio, se stai usando Spark 3.1, devi dichiarare quanto segue nel tuo progetto POM.xml:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Utenti Python

Feature Store Spark SDK è disponibile nel repository open source Amazon SageMaker Feature Store GitHub Spark.

Requisiti

  • Spark >= 3.0.0 e <= 3.3.0

  • Amazon EMR >= 6.1.0 (solo se utilizzi Amazon) EMR

  • Kernel = conda_python3

Consigliamo di impostare $SPARK_HOME sulla directory in cui è installato Spark. Durante l'installazione, Feature Store carica il file necessarioSPARK_HOME, in modo che JAR le dipendenze vengano caricate automaticamente. Spark start a JVM è necessario per far funzionare questa PySpark libreria.

Installazione locale

Per maggiori informazioni sull'installazione, abilita la modalità dettagliata aggiungendo --verbose al seguente comando di installazione.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Installazione su Amazon EMR

Crea un EMR cluster Amazon con la versione 6.1.0 o successiva. Abilita SSH per aiutarti a risolvere eventuali problemi.

Per installare la libreria, procedi come segue:

  • Crea un passaggio personalizzato all'interno di AmazonEMR.

  • Connect al cluster utilizzando SSH e installando la libreria da lì.

Nota

Le seguenti informazioni utilizzano la versione 3.1 di Spark, ma puoi specificare qualsiasi versione che soddisfi i requisiti.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
Nota

Se desideri installare JARs automaticamente il dipendente su SPARK _HOME, non utilizzare la fase di bootstrap.

Installazione su un'istanza di SageMaker notebook

Installa una versione compatibile con il connettore Spark utilizzando i seguenti comandi: PySpark

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Se stai eseguendo l'importazione in batch nell’archivio offline, le dipendenze non rientrano nell'ambiente dell'istanza del notebook.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Installazione su notebook con GIS

Importante

È necessario utilizzare la AWS Glue versione 2.0 o successiva.

Utilizzate le seguenti informazioni per facilitare l'installazione del PySpark connettore in una sessione AWS Glue interattiva (GIS).

Amazon SageMaker Feature Store Spark richiede un connettore Spark specifico JAR durante l'inizializzazione della sessione da caricare nel tuo bucket Amazon S3. Per ulteriori informazioni sul caricamento del necessario JAR nel bucket S3, consulta. Recupero di For Feature JAR Store Spark

Dopo aver caricato ilJAR, devi fornire alle GIS sessioni il JAR comando seguente.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Per installare Feature Store Spark in AWS Glue fase di esecuzione, usa il comando %additional_python_modules magico all'interno del GIS notebook. AWS Glue viene eseguito sui pip moduli che hai specificato in%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Prima di iniziare la AWS Glue sessione, è necessario utilizzare entrambi i comandi magici precedenti.

Installazione su un lavoro AWS Glue

Importante

È necessario utilizzare AWS Glue la versione 2.0 o successiva.

Per installare il connettore Spark su un AWS Glue job, utilizzate l'--extra-jarsargomento per fornire quanto necessario JARs e --additional-python-modules installare lo Spark Connector come parametro del job quando create il AWS Glue job, come illustrato nell'esempio seguente. Per ulteriori informazioni sul caricamento del file richiesto nel bucket JAR S3, consulta. Recupero di For Feature JAR Store Spark

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Installazione su un processo di Amazon SageMaker Processing

Per utilizzare Feature Store Spark con i lavori di Amazon SageMaker Processing, porta la tua immagine. Per informazioni sul caricamento di una propria immagine, consulta Porta la tua SageMaker immagine. Aggiungi la fase di installazione a un file Docker. Dopo aver inviato l'immagine Docker a un ECR repository Amazon, puoi utilizzare il file PySparkProcessor per creare il processo di elaborazione. Per ulteriori informazioni sulla creazione di un processo di elaborazione con il PySpark processore, consulta. Esecuzione di un job di elaborazione con Apache Spark

Di seguito è riportato un esempio di aggiunta di una fase di installazione al file Docker.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Recupero di For Feature JAR Store Spark

Per recuperare la dipendenza Spark del Feature StoreJAR, devi installare il connettore Spark dal repository Python Package Index (PyPI) pip utilizzandolo in qualsiasi ambiente Python con accesso alla rete. Un SageMaker Jupyter Notebook è un esempio di ambiente Python con accesso alla rete.

Il comando seguente installa il connettore Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Dopo aver installato Feature Store Spark, puoi recuperare la JAR posizione e JAR caricarla su Amazon S3.

Il feature-store-pyspark-dependency-jars comando fornisce la posizione della dipendenza necessaria aggiunta da Feature Store JAR Spark. Puoi usare il comando per recuperarlo JAR e caricarlo su Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Implementazioni esemplificative

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Invio di un processo Spark con uno script Python esemplificativo

La PySpark versione richiede l'importazione JAR di un dipendente aggiuntivo, quindi sono necessari passaggi aggiuntivi per eseguire l'applicazione Spark.

Se non l'hai specificato SPARK_HOME durante l'installazione, devi caricare il file richiesto JARs JVM durante l'esecuzionespark-submit. feature-store-pyspark-dependency-jarsè uno script Python installato dalla libreria Spark per recuperare automaticamente il percorso di tutti per te. JARs

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Se esegui questa applicazione su AmazonEMR, ti consigliamo di eseguire l'applicazione in modalità client, in modo da non dover distribuire il dipendente JARs ad altri nodi di attività. Aggiungi un altro passaggio nel EMR cluster Amazon con argomento Spark simile al seguente:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Invio di un processo Spark

Scala

Dovresti essere in grado di utilizzare Feature Store Spark come normale dipendenza. Non sono necessarie istruzioni aggiuntive per eseguire l'applicazione su tutte le piattaforme.