As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.
O Amazon SageMaker Feature Store Spark é um conector Spark que conecta a biblioteca Spark à Feature Store. O Feature Store Spark simplifica a ingestão de dados do Spark DataFrame
para grupos de atributos. O Feature Store suporta a ingestão de dados em lote com o Spark, usando seu pipeline de ETL existente, no Amazon EMR, no GIS, em um trabalho, em um trabalho AWS Glue de SageMaker processamento da Amazon ou em um notebook. SageMaker
Métodos para instalar e implantar a ingestão de dados em lote são fornecidos para desenvolvedores de Python e Scala. Os desenvolvedores de Python podem usar a biblioteca sagemaker-feature-store-pyspark
Python de código aberto para desenvolvimento local, instalação no Amazon EMR e para notebooks Jupyter seguindo as instruções no repositório Spark da Amazon Feature Store. SageMaker GitHub
Você pode usar o conector Spark para ingerir dados das seguintes formas, dependendo se o armazenamento on-line, o armazenamento offline ou ambos estão habilitados:
-
Ingestão por padrão — Se a loja virtual estiver ativada, o conector Spark primeiro ingere seu dataframe na loja virtual usando a API. PutRecord Apenas o registro com o maior horário do evento permanecerá no armazenamento on-line. Se o armazenamento offline estiver habilitado, em 15 minutos, o Feature Store ingerirá seu dataframe no armazenamento offline. Para obter mais informações sobre como os armazenamentos on-line e offline funcionam, consulte Conceitos do Feature Store.
Você pode fazer isso não especificando
target_stores
no método.ingest_data(...)
. -
Ingestão direta do armazenamento offline: Se o armazenamento offline estiver habilitado, o lote do conector Spark ingere seu dataframe diretamente no armazenamento offline. A ingestão do dataframe diretamente no armazenamento offline não atualiza o armazenamento on-line.
Você pode fazer isso definindo
target_stores=["OfflineStore"]
no método.ingest_data(...)
. -
Somente loja virtual — Se a loja virtual estiver ativada, o conector Spark ingere seu dataframe na loja virtual usando a API. PutRecord A ingestão do dataframe diretamente no armazenamento on-line não atualiza o armazenamento offline.
Você pode fazer isso definindo
target_stores=["OnlineStore"]
no método.ingest_data(...)
.
Para obter informações sobre como usar diferentes métodos de ingestão, consulte Implementações de exemplos.
Tópicos
Instalação do Feature Store Spark
Usuários do Scala
O SDK do Spark da Feature Store está disponível no repositório central Maven do SDK Spark da Amazon SageMaker Feature Store
Requisitos
-
Spark >= 3.0.0 e <= 3.3.0
-
iceberg-spark-runtime
>= 0.14.0 -
Scala >= 2.12.x
-
Amazon EMR >= 6.1.0 (somente se você estiver usando o Amazon EMR)
Declarar a dependência em POM.xml
O conector do Feature Store Spark depende da biblioteca iceberg-spark-runtime
. Portanto, você deve adicionar a versão correspondente da biblioteca iceberg-spark-runtime
à dependência se estiver ingerindo dados em um grupo de atributos que você criou automaticamente com o formato de tabela Iceberg. Por exemplo, se você estiver usando o Spark 3.1, você deve declarar o seguinte no seu projeto POM.xml
:
<dependency>
<groupId>software.amazon.sagemaker.featurestore</groupId>
<artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId>
<version>1.0.0</version>
</dependency>
<dependency>
<groupId>org.apache.iceberg</groupId>
<artifactId>iceberg-spark-runtime-3.1_2.12</artifactId>
<version>0.14.0</version>
</dependency>
Usuários de Python
O SDK do Feature Store Spark está disponível no repositório de código aberto SageMaker Amazon Feature Store GitHub
Requisitos
-
Spark >= 3.0.0 e <= 3.3.0
-
Amazon EMR >= 6.1.0 (somente se você estiver usando o Amazon EMR)
-
Kernel =
conda_python3
Recomendamos configurar o $SPARK_HOME
para o diretório em que você tem o Spark instalado. Durante a instalação, o Feature Store carrega o JAR necessário para SPARK_HOME
, para que as dependências sejam carregadas automaticamente. É necessário iniciar uma JVM pelo Spark para fazer essa PySpark biblioteca funcionar.
Instalação local
Para obter mais informações sobre a instalação, habilite o modo detalhado anexando --verbose
ao seguinte comando de instalação:
pip3 install sagemaker-feature-store-pyspark-
3.1
--no-binary :all:
Instalação no Amazon EMR
Crie um cluster do Amazon EMR com a versão 6.1.0 ou posterior. Habilite o SSH para ajudá-lo a solucionar qualquer problema.
Para instalar a biblioteca, faça o seguinte:
-
Crie uma etapa personalizada no Amazon EMR.
-
Conecte-se ao seu cluster usando SSH e instale a biblioteca a partir daí.
nota
As informações a seguir usam a versão 3.1 do Spark, mas você pode especificar qualquer versão que atenda aos requisitos.
export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-
3.1
--no-binary :all: --verbose
nota
Se você quiser instalar o dependente JARs automaticamente no SPARK_HOME, não use a etapa de bootstrap.
Instalação em uma instância de SageMaker notebook
Instale uma versão compatível com o conector Spark usando os seguintes comandos: PySpark
!pip3 install pyspark==3.1.1
!pip3 install sagemaker-feature-store-pyspark-3.1
--no-binary :all:
Se você estiver realizando a ingestão em lote no armazenamento offline, as dependências não estarão dentro do ambiente da instância do caderno.
from pyspark.sql import SparkSession
import feature_store_pyspark
extra_jars = ",".join(feature_store_pyspark.classpath_jars())
spark = SparkSession.builder \
.config("spark.jars", extra_jars) \
.config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \
.getOrCreate()
Instalação em cadernos com GIS
Importante
Você deve usar a AWS Glue versão 2.0 ou posterior.
Use as informações a seguir para ajudá-lo a instalar o PySpark conector em uma sessão AWS Glue interativa (GIS).
O Amazon SageMaker Feature Store Spark exige que um conector JAR específico do Spark durante a inicialização da sessão seja carregado em seu bucket do Amazon S3. Para obter mais informações sobre como carregar o JAR necessário para o bucket do S3, consulte Recuperar o JAR para o Feature Store Spark.
Depois de fazer o upload do JAR, você deve fornecer o JAR às sessões do GIS usando o comando a seguir.
%extra_jars s3:/
<YOUR_BUCKET>
/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar
Para instalar o Feature Store Spark em AWS Glue tempo de execução, use o comando %additional_python_modules
mágico no notebook GIS. AWS Glue é executado pip
nos módulos que você especificou abaixo%additional_python_modules
.
%additional_python_modules sagemaker-feature-store-pyspark-
3.1
Antes de iniciar a AWS Glue sessão, você deve usar os dois comandos mágicos anteriores.
Instalação em um AWS Glue trabalho
Importante
Você deve usar a AWS Glue versão 2.0 ou posterior.
Para instalar o conector Spark em uma AWS Glue tarefa, use o --extra-jars
argumento para fornecer o necessário JARs e --additional-python-modules
instalar o conector Spark como parâmetros da tarefa ao criar a AWS Glue tarefa, conforme mostrado no exemplo a seguir. Para obter mais informações sobre como carregar o JAR necessário para o bucket do S3, consulte Recuperar o JAR para o Feature Store Spark.
glue_client = boto3.client('glue', region_name=region)
response = glue_client.create_job(
Name=pipeline_id,
Description='Feature Store Compute Job',
Role=glue_role_arn,
ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run},
Command={
'Name': 'glueetl',
'ScriptLocation': script_location_uri,
'PythonVersion': '3'
},
DefaultArguments={
'--TempDir': temp_dir_location_uri,
'--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1',
'--extra-jars': "s3:/<YOUR_BUCKET>
/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar",
...
},
MaxRetries=3,
NumberOfWorkers=149,
Timeout=2880,
GlueVersion='3.0',
WorkerType='G.2X'
)
Instalação em uma tarefa de SageMaker processamento da Amazon
Para usar o Feature Store Spark com trabalhos SageMaker de processamento da Amazon, traga sua própria imagem. Para obter informações sobre como trazer sua própria imagem, consulte Traga sua própria imagem de SageMaker IA. Adicione a etapa de instalação a um Dockerfile. Depois de enviar a imagem do Docker para um repositório Amazon ECR, você pode usar o PySparkProcessor para criar o trabalho de processamento. Para obter mais informações sobre a criação de uma tarefa de processamento com o PySpark processador, consulteExecutar um Trabalho de Processamento com o Apache Spark.
Veja a seguir um exemplo da adição de uma etapa de instalação ao Dockerfile.
FROM
<ACCOUNT_ID>
.dkr.ecr.<AWS_REGION>
.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
Recuperar o JAR para o Feature Store Spark
Para recuperar o JAR de dependência do Feature Store Spark, você deve instalar o conector Spark a partir do repositório Python Package Index (PyPI) usando pip
em qualquer ambiente Python com acesso à rede. Um notebook SageMaker Jupyter é um exemplo de ambiente Python com acesso à rede.
O comando a seguir instala o conector Spark.
!pip install sagemaker-feature-store-pyspark-
3.1
Depois de instalar o Feature Store Spark, você pode recuperar a localização do JAR e fazer o upload do JAR para o Amazon S3.
O comando feature-store-pyspark-dependency-jars
fornece a localização do JAR de dependência necessário que o Feature Store Spark adicionou. Você pode usar o comando para recuperar o JAR e carregá-lo no Amazon S3.
jar_location = !feature-store-pyspark-dependency-jars
jar_location = jar_location[0]
s3_client = boto3.client("s3")
s3_client.upload_file(jar_location, "<YOUR_BUCKET>
","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")
Implementações de exemplos
FeatureStoreBatchIngestion.py
from pyspark.sql import SparkSession
from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager
import feature_store_pyspark
spark = SparkSession.builder \
.getOrCreate()
# Construct test DataFrame
columns = ["RecordIdentifier", "EventTime"]
data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")]
df = spark.createDataFrame(data).toDF(*columns)
# Initialize FeatureStoreManager with a role arn if your feature group is created by another account
feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn")
# Load the feature definitions from input schema. The feature definitions can be used to create a feature group
feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df)
feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>
:<ACCOUNT_ID>
:feature-group/<YOUR_FEATURE_GROUP_NAME>
"
# Ingest by default. The connector will leverage PutRecord API to ingest your data in stream
# https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn)
# To select the target stores for ingestion, you can specify the target store as the paramter
# If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"])
# If only OfflineStore is selected, the connector will batch write the data to offline store directly
feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"])
# To retrieve the records failed to be ingested by spark connector
failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()
Envie um trabalho do Spark com um exemplo de script Python
A PySpark versão exige que um JAR extra dependente seja importado, portanto, etapas extras são necessárias para executar o aplicativo Spark.
Se você não especificou SPARK_HOME
durante a instalação, precisará carregar o necessário JARs na JVM durante a execução. spark-submit
feature-store-pyspark-dependency-jars
é um script Python instalado pela biblioteca Spark para buscar automaticamente o caminho para tudo para você. JARs
spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py
Se você estiver executando esse aplicativo no Amazon EMR, recomendamos que você execute o aplicativo no modo cliente, para que você não precise distribuir o dependente JARs para outros nós de tarefas. Adicione mais uma etapa no cluster do Amazon EMR com o argumento Spark semelhante ao seguinte:
spark-submit --deploy-mode client --master yarn s3:/
<PATH_TO_SCRIPT>
/FeatureStoreBatchIngestion.py