Inserimento in batch con Amazon SageMaker Feature Store Spark - Amazon SageMaker AI

Le traduzioni sono generate tramite traduzione automatica. In caso di conflitto tra il contenuto di una traduzione e la versione originale in Inglese, quest'ultima prevarrà.

Inserimento in batch con Amazon SageMaker Feature Store Spark

Amazon SageMaker Feature Store Spark è un connettore Spark che collega la libreria Spark al Feature Store. Feature Store Spark semplifica l'inserimento dei dati da Spark DataFrame nei gruppi di funzionalità. Feature Store supporta l'inserimento di dati in batch con Spark, utilizzando la pipeline ETL esistente, su Amazon EMR, GIS, un job, AWS Glue un job di SageMaker Amazon Processing o un notebook. SageMaker

I metodi per l'installazione e l'implementazione dell'inserimento di dati in batch sono disponibili per gli sviluppatori Python e Scala. Gli sviluppatori Python possono utilizzare la libreria sagemaker-feature-store-pyspark Python open source per lo sviluppo locale, l'installazione su Amazon EMR e per Jupyter Notebooks seguendo le istruzioni nel repository Amazon Feature Store Spark. SageMaker GitHub Gli sviluppatori di Scala possono utilizzare il connettore Feature Store Spark disponibile nell'archivio centrale Amazon SageMaker Feature Store Spark SDK Maven.

Puoi utilizzare il connettore Spark per importare dati nei seguenti modi, a seconda che l'archivio online, l'archivio offline o entrambi siano abilitati.

  1. Ingestisci per impostazione predefinita: se il negozio online è abilitato, il connettore Spark inserisce innanzitutto il tuo dataframe nel negozio online utilizzando l'API. PutRecord Nell'archivio online rimane solo il record con il maggior numero di eventi. Se l'archivio offline è abilitato, entro 15 minuti Feature Store inserisce il data frame nell'archivio offline. Per ulteriori informazioni sul funzionamento degli archivi online e offline, consulta Concetti di base sul Feature Store.

    È possibile eseguire questa operazione anche senza specificare target_stores nel metodo .ingest_data(...).

  2. Inserimento diretto in archivio offline: se l’archivio offline è abilitato, Spark Connector inserisce in batch il data frame direttamente nell’archivio offline. L'inserimento del data frame direttamente nell'archivio offline non aggiorna l'archivio online.

    È possibile eseguire questa operazione impostando target_stores=["OfflineStore"] nel metodo .ingest_data(...).

  3. Solo negozio online: se il negozio online è abilitato, Spark Connector inserisce il tuo dataframe nel negozio online utilizzando l'API. PutRecord L'inserimento del data frame direttamente nell'archivio online non aggiorna l'archivio offline.

    È possibile eseguire questa operazione impostando target_stores=["OnlineStore"] nel metodo .ingest_data(...).

Per ulteriori informazioni sui diversi metodi di inserimento, consulta Implementazioni esemplificative.

Installazione di Feature Store Spark

Utenti Scala

L'SDK Feature Store Spark è disponibile nell'archivio centrale Amazon SageMaker Feature Store Spark SDK Maven per gli utenti Scala.

Requisiti

  • Spark >= 3.0.0 e <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (solo se utilizzi Amazon EMR)

Dichiarazione della dipendenza in POM.xml

Il connettore Feature Store Spark dipende dalla libreria iceberg-spark-runtime. Devi quindi aggiungere la versione corrispondente della libreria iceberg-spark-runtime alla dipendenza se stai inserendo dati in un gruppo di funzionalità che hai creato automaticamente con il formato di tabella Iceberg. Ad esempio, se stai usando Spark 3.1, devi dichiarare quanto segue nel tuo progetto POM.xml:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Utenti Python

L'SDK Feature Store Spark è disponibile nel repository open source SageMaker Amazon Feature Store GitHub Spark.

Requisiti

  • Spark >= 3.0.0 e <= 3.3.0

  • Amazon EMR >= 6.1.0 (solo se utilizzi Amazon EMR)

  • Kernel = conda_python3

Consigliamo di impostare $SPARK_HOME sulla directory in cui è installato Spark. Durante l'installazione, Feature Store carica il JAR richiesto in SPARK_HOME, in modo che le dipendenze vengano caricate automaticamente. L'avvio di una JVM da parte di Spark è necessario per far funzionare questa libreria. PySpark

Installazione locale

Per maggiori informazioni sull'installazione, abilita la modalità dettagliata aggiungendo --verbose al seguente comando di installazione.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Installazione su Amazon EMR

Creazione di un cluster Amazon EMR con release 6.1.0 o versioni successive. Abilita SSH per risolvere eventuali problemi.

Per installare la libreria, procedi come segue:

  • Crea una fase personalizzata all'interno di Amazon EMR.

  • Connettiti al cluster tramite SSH e installa la libreria da questa posizione.

Nota

Le seguenti informazioni utilizzano la versione 3.1 di Spark, ma puoi specificare qualsiasi versione che soddisfi i requisiti.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
Nota

Se desideri installare JARs automaticamente il dipendente su SPARK_HOME, non utilizzare la fase di bootstrap.

Installazione su un'istanza di notebook SageMaker

Installa una versione compatibile con il connettore Spark utilizzando i seguenti comandi: PySpark

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Se stai eseguendo l'importazione in batch nell’archivio offline, le dipendenze non rientrano nell'ambiente dell'istanza del notebook.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Installazione su notebook con GIS

Importante

È necessario utilizzare AWS Glue la versione 2.0 o successiva.

Utilizza le seguenti informazioni per aiutarti a installare il PySpark connettore in una sessione AWS Glue interattiva (GIS).

Amazon SageMaker Feature Store Spark richiede un connettore Spark JAR specifico durante l'inizializzazione della sessione da caricare nel tuo bucket Amazon S3. Per ulteriori informazioni sul caricamento del JAR richiesto nel bucket S3, consulta Recupero del JAR per Feature Store Spark.

Dopo aver caricato il JAR, devi fornirlo per le sessioni GIS utilizzando il seguente comando.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Per installare Feature Store Spark in fase di AWS Glue esecuzione, usa il comando %additional_python_modules magico all'interno del notebook GIS. AWS Glue viene eseguito sui pip moduli che hai specificato in. %additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Prima di iniziare la AWS Glue sessione, è necessario utilizzare entrambi i comandi magici precedenti.

Installazione su un lavoro AWS Glue

Importante

È necessario utilizzare AWS Glue la versione 2.0 o successiva.

Per installare il connettore Spark su un AWS Glue job, utilizzate l'--extra-jarsargomento per fornire quanto necessario JARs e --additional-python-modules installare lo Spark Connector come parametro del job quando create il AWS Glue job, come illustrato nell'esempio seguente. Per ulteriori informazioni sul caricamento del JAR richiesto nel bucket S3, consulta Recupero del JAR per Feature Store Spark.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Installazione su un processo di Amazon SageMaker Processing

Per utilizzare Feature Store Spark con i lavori di Amazon SageMaker Processing, porta la tua immagine. Per informazioni sul caricamento di una propria immagine, consulta Porta la tua immagine SageMaker AI. Aggiungi la fase di installazione a un file Docker. Dopo aver inviato l'immagine Docker a un repository Amazon ECR, puoi utilizzare il file PySparkProcessor per creare il processo di elaborazione. Per ulteriori informazioni sulla creazione di un processo di elaborazione con il PySpark processore, consulta. Esecuzione di un job di elaborazione con Apache Spark

Di seguito è riportato un esempio di aggiunta di una fase di installazione al file Docker.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Recupero del JAR per Feature Store Spark

Per recuperare il JAR della dipendenza Feature Store Spark, devi installare il connettore Spark dal repository Python Package Index (PyPI) usando pip in qualsiasi ambiente Python con accesso alla rete. Un SageMaker Jupyter Notebook è un esempio di ambiente Python con accesso alla rete.

Il comando seguente installa il connettore Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Dopo aver installato Feature Store Spark, puoi recuperare la posizione JAR e caricarla su Amazon S3.

Il comando feature-store-pyspark-dependency-jars fornisce la posizione della dipendenza JAR necessaria aggiunta da Feature Store Spark. Puoi utilizzare il comando per recuperare il file JAR e caricarlo su Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Implementazioni esemplificative

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Invio di un processo Spark con uno script Python esemplificativo

La PySpark versione richiede l'importazione di un JAR dipendente aggiuntivo, quindi sono necessari passaggi aggiuntivi per eseguire l'applicazione Spark.

Se non l'hai specificato SPARK_HOME durante l'installazione, devi caricare il file richiesto JARs in JVM durante l'esecuzione. spark-submit feature-store-pyspark-dependency-jarsè uno script Python installato dalla libreria Spark per recuperare automaticamente il percorso di tutti per te. JARs

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Se esegui questa applicazione su Amazon EMR, ti consigliamo di eseguire l'applicazione in modalità client, in modo da non dover distribuire il dipendente JARs ad altri nodi di attività. Aggiungi un’altra fase nel cluster Amazon EMR con argomento Spark simile al seguente:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Invio di un processo Spark

Scala

Dovresti essere in grado di utilizzare Feature Store Spark come normale dipendenza. Non sono necessarie istruzioni aggiuntive per eseguire l'applicazione su tutte le piattaforme.