Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Ingestão em lote com a Amazon SageMaker Feature Store Spark

Modo de foco
Ingestão em lote com a Amazon SageMaker Feature Store Spark - SageMaker IA da Amazon

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

O Amazon SageMaker Feature Store Spark é um conector Spark que conecta a biblioteca Spark à Feature Store. O Feature Store Spark simplifica a ingestão de dados do Spark DataFrame para grupos de atributos. O Feature Store suporta a ingestão de dados em lote com o Spark, usando seu pipeline de ETL existente, no Amazon EMR, no GIS, em um trabalho, em um trabalho AWS Glue de SageMaker processamento da Amazon ou em um notebook. SageMaker

Métodos para instalar e implantar a ingestão de dados em lote são fornecidos para desenvolvedores de Python e Scala. Os desenvolvedores de Python podem usar a biblioteca sagemaker-feature-store-pyspark Python de código aberto para desenvolvimento local, instalação no Amazon EMR e para notebooks Jupyter seguindo as instruções no repositório Spark da Amazon Feature Store. SageMaker GitHub Os desenvolvedores do Scala podem usar o conector Spark da Feature Store disponível no repositório central Maven do SDK Spark da Amazon SageMaker Feature Store.

Você pode usar o conector Spark para ingerir dados das seguintes formas, dependendo se o armazenamento on-line, o armazenamento offline ou ambos estão habilitados:

  1. Ingestão por padrão — Se a loja virtual estiver ativada, o conector Spark primeiro ingere seu dataframe na loja virtual usando a API. PutRecord Apenas o registro com o maior horário do evento permanecerá no armazenamento on-line. Se o armazenamento offline estiver habilitado, em 15 minutos, o Feature Store ingerirá seu dataframe no armazenamento offline. Para obter mais informações sobre como os armazenamentos on-line e offline funcionam, consulte Conceitos do Feature Store.

    Você pode fazer isso não especificando target_stores no método .ingest_data(...).

  2. Ingestão direta do armazenamento offline: Se o armazenamento offline estiver habilitado, o lote do conector Spark ingere seu dataframe diretamente no armazenamento offline. A ingestão do dataframe diretamente no armazenamento offline não atualiza o armazenamento on-line.

    Você pode fazer isso definindo target_stores=["OfflineStore"] no método .ingest_data(...).

  3. Somente loja virtual — Se a loja virtual estiver ativada, o conector Spark ingere seu dataframe na loja virtual usando a API. PutRecord A ingestão do dataframe diretamente no armazenamento on-line não atualiza o armazenamento offline.

    Você pode fazer isso definindo target_stores=["OnlineStore"] no método .ingest_data(...).

Para obter informações sobre como usar diferentes métodos de ingestão, consulte Implementações de exemplos.

Instalação do Feature Store Spark

Usuários do Scala

O SDK do Spark da Feature Store está disponível no repositório central Maven do SDK Spark da Amazon SageMaker Feature Store para usuários do Scala.

Requisitos

  • Spark >= 3.0.0 e <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (somente se você estiver usando o Amazon EMR)

Declarar a dependência em POM.xml

O conector do Feature Store Spark depende da biblioteca iceberg-spark-runtime. Portanto, você deve adicionar a versão correspondente da biblioteca iceberg-spark-runtime à dependência se estiver ingerindo dados em um grupo de atributos que você criou automaticamente com o formato de tabela Iceberg. Por exemplo, se você estiver usando o Spark 3.1, você deve declarar o seguinte no seu projeto POM.xml:

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Usuários de Python

O SDK do Feature Store Spark está disponível no repositório de código aberto SageMaker Amazon Feature Store GitHub Spark.

Requisitos

  • Spark >= 3.0.0 e <= 3.3.0

  • Amazon EMR >= 6.1.0 (somente se você estiver usando o Amazon EMR)

  • Kernel = conda_python3

Recomendamos configurar o $SPARK_HOME para o diretório em que você tem o Spark instalado. Durante a instalação, o Feature Store carrega o JAR necessário para SPARK_HOME, para que as dependências sejam carregadas automaticamente. É necessário iniciar uma JVM pelo Spark para fazer essa PySpark biblioteca funcionar.

Instalação local

Para obter mais informações sobre a instalação, habilite o modo detalhado anexando --verbose ao seguinte comando de instalação:

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Instalação no Amazon EMR

Crie um cluster do Amazon EMR com a versão 6.1.0 ou posterior. Habilite o SSH para ajudá-lo a solucionar qualquer problema.

Para instalar a biblioteca, faça o seguinte:

  • Crie uma etapa personalizada no Amazon EMR.

  • Conecte-se ao seu cluster usando SSH e instale a biblioteca a partir daí.

nota

As informações a seguir usam a versão 3.1 do Spark, mas você pode especificar qualquer versão que atenda aos requisitos.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
nota

Se você quiser instalar o dependente JARs automaticamente no SPARK_HOME, não use a etapa de bootstrap.

Instalação em uma instância de SageMaker notebook

Instale uma versão compatível com o conector Spark usando os seguintes comandos: PySpark

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Se você estiver realizando a ingestão em lote no armazenamento offline, as dependências não estarão dentro do ambiente da instância do caderno.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

Instalação em cadernos com GIS

Importante

Você deve usar a AWS Glue versão 2.0 ou posterior.

Use as informações a seguir para ajudá-lo a instalar o PySpark conector em uma sessão AWS Glue interativa (GIS).

O Amazon SageMaker Feature Store Spark exige que um conector JAR específico do Spark durante a inicialização da sessão seja carregado em seu bucket do Amazon S3. Para obter mais informações sobre como carregar o JAR necessário para o bucket do S3, consulte Recuperar o JAR para o Feature Store Spark.

Depois de fazer o upload do JAR, você deve fornecer o JAR às sessões do GIS usando o comando a seguir.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

Para instalar o Feature Store Spark em AWS Glue tempo de execução, use o comando %additional_python_modules mágico no notebook GIS. AWS Glue é executado pip nos módulos que você especificou abaixo%additional_python_modules.

%additional_python_modules sagemaker-feature-store-pyspark-3.1

Antes de iniciar a AWS Glue sessão, você deve usar os dois comandos mágicos anteriores.

Instalação em um AWS Glue trabalho

Importante

Você deve usar a AWS Glue versão 2.0 ou posterior.

Para instalar o conector Spark em uma AWS Glue tarefa, use o --extra-jars argumento para fornecer o necessário JARs e --additional-python-modules instalar o conector Spark como parâmetros da tarefa ao criar a AWS Glue tarefa, conforme mostrado no exemplo a seguir. Para obter mais informações sobre como carregar o JAR necessário para o bucket do S3, consulte Recuperar o JAR para o Feature Store Spark.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Instalação em uma tarefa de SageMaker processamento da Amazon

Para usar o Feature Store Spark com trabalhos SageMaker de processamento da Amazon, traga sua própria imagem. Para obter informações sobre como trazer sua própria imagem, consulte Traga sua própria imagem de SageMaker IA. Adicione a etapa de instalação a um Dockerfile. Depois de enviar a imagem do Docker para um repositório Amazon ECR, você pode usar o PySparkProcessor para criar o trabalho de processamento. Para obter mais informações sobre a criação de uma tarefa de processamento com o PySpark processador, consulteExecutar um Trabalho de Processamento com o Apache Spark.

Veja a seguir um exemplo da adição de uma etapa de instalação ao Dockerfile.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

Recuperar o JAR para o Feature Store Spark

Para recuperar o JAR de dependência do Feature Store Spark, você deve instalar o conector Spark a partir do repositório Python Package Index (PyPI) usando pip em qualquer ambiente Python com acesso à rede. Um notebook SageMaker Jupyter é um exemplo de ambiente Python com acesso à rede.

O comando a seguir instala o conector Spark.

!pip install sagemaker-feature-store-pyspark-3.1

Depois de instalar o Feature Store Spark, você pode recuperar a localização do JAR e fazer o upload do JAR para o Amazon S3.

O comando feature-store-pyspark-dependency-jars fornece a localização do JAR de dependência necessário que o Feature Store Spark adicionou. Você pode usar o comando para recuperar o JAR e carregá-lo no Amazon S3.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

Implementações de exemplos

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Envie um trabalho do Spark com um exemplo de script Python

A PySpark versão exige que um JAR extra dependente seja importado, portanto, etapas extras são necessárias para executar o aplicativo Spark.

Se você não especificou SPARK_HOME durante a instalação, precisará carregar o necessário JARs na JVM durante a execução. spark-submit feature-store-pyspark-dependency-jarsé um script Python instalado pela biblioteca Spark para buscar automaticamente o caminho para tudo para você. JARs

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Se você estiver executando esse aplicativo no Amazon EMR, recomendamos que você execute o aplicativo no modo cliente, para que você não precise distribuir o dependente JARs para outros nós de tarefas. Adicione mais uma etapa no cluster do Amazon EMR com o argumento Spark semelhante ao seguinte:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Enviar Tarefas do Spark

Scala

Você pode usar o Feature Store Spark como uma dependência normal. Nenhuma instrução extra é necessária para executar a aplicação em todas as plataformas.

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

Envie um trabalho do Spark com um exemplo de script Python

A PySpark versão exige que um JAR extra dependente seja importado, portanto, etapas extras são necessárias para executar o aplicativo Spark.

Se você não especificou SPARK_HOME durante a instalação, precisará carregar o necessário JARs na JVM durante a execução. spark-submit feature-store-pyspark-dependency-jarsé um script Python instalado pela biblioteca Spark para buscar automaticamente o caminho para tudo para você. JARs

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Se você estiver executando esse aplicativo no Amazon EMR, recomendamos que você execute o aplicativo no modo cliente, para que você não precise distribuir o dependente JARs para outros nós de tarefas. Adicione mais uma etapa no cluster do Amazon EMR com o argumento Spark semelhante ao seguinte:

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.