Les traductions sont fournies par des outils de traduction automatique. En cas de conflit entre le contenu d'une traduction et celui de la version originale en anglais, la version anglaise prévaudra.
Cette section fournit des exemples d'implémentations de sources de données personnalisées pour les intégrateurs de fonctionnalités. Pour plus d'informations sur les sources de données personnalisées, consultez Sources de données personnalisées.
La sécurité est une responsabilité partagée entre AWS et nos clients. AWS est chargé de protéger l'infrastructure qui gère les services dans le AWS Cloud. Les clients sont responsables de toutes leurs tâches de configuration et de gestion de sécurité nécessaires. Par exemple, des secrets tels que les informations d'identification d'accès aux magasins de données ne doivent pas être codés en dur dans vos sources de données personnalisées. Vous pouvez les utiliser AWS Secrets Manager pour gérer ces informations d'identification. Pour plus d'informations sur Secrets Manager, consultez Qu'est-ce que c'est AWS Secrets Manager ? dans le guide de AWS Secrets Manager l'utilisateur. Les exemples suivants utilisent Secrets Manager pour vos informations d'identification.
Rubriques
Exemples de sources de données personnalisées Amazon Redshift Clusters (JDBC)
Amazon Redshift propose un pilote JDBC qui peut être utilisé pour lire des données avec Spark. Pour obtenir des informations sur le téléchargement du pilote JDBC Amazon Redshift, consultez Téléchargement du pilote JDBC Amazon Redshift version 2.1.
Pour créer la classe de sources de données Amazon Redshift personnalisée, vous devez remplacer la méthode read_data
à partir des Sources de données personnalisées.
Pour vous connecter à un cluster Amazon Redshift, vous avez besoin des éléments suivants :
-
URL JDBC Amazon Redshift (
)jdbc-url
Pour obtenir des informations sur l'obtention de votre URL JDBC Amazon Redshift, consultez Obtention de l'URL JDBC dans le Guide du développeur de base de données Amazon Redshift.
-
Nom d'utilisateur (
) et mot de passe (redshift-user
) Amazon Redshiftredshift-password
Pour obtenir des informations sur la manière de créer et de gérer des utilisateurs de base de données à l'aide des commandes SQL Amazon Redshift, consultez Utilisateurs dans le Guide du développeur de base de données Amazon Redshift.
-
Nom de table Amazon Redshift (
)redshift-table-name
Pour obtenir des informations sur la manière de créer une table à partir de quelques exemples, consultez CREATE TABLE dans le Guide du développeur de base de données Amazon Redshift.
-
(Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (
) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Amazon Redshift dans Secrets Manager.secret-redshift-account-info
Pour plus d'informations sur Secrets Manager, voir Rechercher des secrets AWS Secrets Manager dans le Guide de AWS Secrets Manager l'utilisateur.
-
Région AWS (
)your-region
Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name
dans la documentation de Boto3.
L'exemple suivant montre comment récupérer l'URL JDBC et le jeton d'accès personnel depuis Secrets Manager et comment remplacer read_data
pour votre classe de sources de données personnalisée, DatabricksDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "
redshift-resource-arn
" def read_data(self, spark, params): url = "jdbc-url
?user=redshift-user
&password=redshift-password
" aws_iam_role_arn = "redshift-command-access-role
" secret_name = "secret-redshift-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url
", secrets["jdbcurl"]).replace("redshift-user
", secrets['username']).replace("redshift-password
", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name
") \ .option("tempdir", "s3a://your-bucket-name
/your-bucket-prefix
") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()
L'exemple suivant montre comment connecter RedshiftDataSource
à votre décorateur feature_processor
.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df
Pour exécuter la tâche de l'intégrateur de fonctionnalités à distance, vous devez fournir le pilote jdbc en définissant SparkConfig
et le transmettre au décorateur @remote
.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Exemples de sources de données personnalisées Snowflake
Snowflake fournit un connecteur Spark qui peut être utilisé pour votre décorateur feature_processor
. Pour obtenir des informations sur le connecteur Snowflake pour Spark, consultez Connecteur Snowflake pour Spark
Pour créer la classe de sources de données Snowflake personnalisée, vous devez remplacer la méthode read_data
à partir des Sources de données personnalisées et ajouter les packages du connecteur Spark au chemin de classe Spark.
Pour vous connecter à une source de données Snowflake, vous avez besoin des éléments suivants :
-
URL Snowflake (
)sf-url
URLs Pour plus d'informations sur l'accès aux interfaces Web de Snowflake, consultez la section Identifiants de compte
dans la documentation de Snowflake. -
Base de données Snowflake (
)sf-database
Pour obtenir des informations sur l'obtention du nom de votre base de données à l'aide de Snowflake, consultez CURRENT_DATABASE
dans la documentation de Snowflake. -
Schéma de base de données Snowflake (
)sf-schema
Pour en savoir plus sur l'obtention du nom de votre schéma à l'aide de Snowflake, consultez CURRENT_SCHEMA
dans la documentation de Snowflake. -
Entrepôt Snowflake (
)sf-warehouse
Pour obtenir des informations sur l'obtention du nom de votre entrepôt à l'aide de Snowflake, consultez CURRENT_WAREHOUSE
dans la documentation de Snowflake. -
Nom de table Snowflake (
)sf-table-name
-
(Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (
) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Snowflake dans Secrets Manager.secret-snowflake-account-info
Pour plus d'informations sur Secrets Manager, voir Rechercher des secrets AWS Secrets Manager dans le Guide de AWS Secrets Manager l'utilisateur.
-
Région AWS (
)your-region
Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name
dans la documentation de Boto3.
L'exemple suivant montre comment récupérer le nom d'utilisateur et le mot de passe Snowflake depuis Secrets Manager et comment remplacer la fonction read_data
pour votre classe de sources de données personnalisée SnowflakeDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "
sf-url
", "sfDatabase" : "sf-database
", "sfSchema" : "sf-schema
", "sfWarehouse" : "sf-warehouse
", } data_source_name = "Snowflake" data_source_unique_id = "sf-url
" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name
") \ .load()
L'exemple suivant montre comment connecter SnowflakeDataSource
à votre décorateur feature_processor
.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df
Pour exécuter la tâche de l'intégrateur de fonctionnalités à distance, vous devez fournir les packages en définissant SparkConfig
et les transmettre au décorateur @remote
. Dans l'exemple suivant, les packages Spark sont tels que spark-snowflake_2.12
correspond à la version Scala de l'intégrateur de fonctionnalités, 2.12.0
à la version de Snowflake que vous souhaitez utiliser et spark_3.3
à la version Spark de l'intégrateur de fonctionnalités.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="
feature-group-arn>
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Exemples de sources de données personnalisées Databricks (JDBC)
Spark peut lire les données de Databricks à l'aide du pilote JDBC Databricks. Pour obtenir des informations sur le pilote JDBC Databricks, consultez Configuration des pilotes ODBC et JDBC Databricks
Note
Vous pouvez lire les données de n'importe quelle autre base de données en incluant le pilote JDBC correspondant dans le chemin de classe Spark. Pour plus d'informations, consultez JDBC vers d'autres bases de données
Pour créer la classe de sources de données Databricks personnalisée, vous devez remplacer la méthode read_data
à partir des Sources de données personnalisées et ajouter le fichier JAR JDBC au chemin de classe Spark.
Pour vous connecter à une source de données Databricks, vous avez besoin des éléments suivants :
-
URL Databricks (
)databricks-url
Pour obtenir des informations sur votre URL Databricks, consultez Création de l'URL de connexion pour le pilote Databricks
(langue française non garantie) dans la documentation de Databricks. -
Jeton d'accès personnel Databricks (
)personal-access-token
Pour obtenir des informations sur votre jeton d'accès Databricks, consultez Authentification par jeton d'accès personnel Databricks
(langue française non garantie) dans la documentation de Databricks. -
Nom de catalogue de données (
)db-catalog
Pour obtenir des informations sur le nom de votre catalogue Databricks, consultez Nom de catalogue
(langue française non garantie) dans la documentation de Databricks. -
Nom de schéma (
)db-schema
Pour obtenir des informations sur le nom de votre schéma Databricks, consultez Nom de schéma
(langue française non garantie) dans la documentation de Databricks. -
Nom de table (
)db-table-name
Pour obtenir des informations sur le nom de votre table Databricks, consultez Nom de table
(langue française non garantie) dans la documentation de Databricks. -
(Facultatif) Si vous utilisez Secrets Manager, vous avez besoin du nom du secret (
) dans lequel vous stockez votre nom d'utilisateur et votre mot de passe d'accès à Databricks dans Secrets Manager.secret-databricks-account-info
Pour plus d'informations sur Secrets Manager, voir Rechercher des secrets AWS Secrets Manager dans le Guide de AWS Secrets Manager l'utilisateur.
-
Région AWS (
)your-region
Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name
dans la documentation de Boto3.
L'exemple suivant montre comment récupérer l'URL JDBC et le jeton d'accès personnel depuis Secrets Manager et comment remplacer read_data
pour votre classe de sources de données personnalisée DatabricksDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "
databricks-url
" def read_data(self, spark, params): secret_name = "secret-databricks-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token
", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog
`.`db-schema
`.`db-table-name
`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()
L'exemple suivant montre comment charger le fichier JAR de pilote JDBC,
, sur Amazon S3 afin de l'ajouter au chemin de classe Spark. Pour obtenir des informations sur le téléchargement du pilote JDBC Spark (jdbc-jar-file-name
.jar
) depuis Databricks, consultez Téléchargement du pilote JDBCjdbc-jar-file-name
.jar
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar"} ) def transform(input_df): return input_df
Pour exécuter la tâche de l'intégrateur de fonctionnalités à distance, vous devez fournir les fichiers JAR en définissant SparkConfig
et les transmettre au décorateur @remote
.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://
your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Exemples de sources de données personnalisées en streaming
Vous pouvez vous connecter à des sources de données de streaming telles qu'Amazon Kinesis, et créer des transformations avec Spark Structured Streaming pour lire à partir de sources de données de streaming. Pour plus d'informations sur le connecteur Kinesis, consultez la section Connecteur Kinesis pour Spark Structured Streaming
Pour créer la classe de source de données Amazon Kinesis personnalisée, vous devez étendre la BaseDataSource
classe et remplacer la méthode à partir deread_data
. Sources de données personnalisées
Pour vous connecter à un flux de données Amazon Kinesis, vous devez :
-
Kinesis ARN ()
kinesis-resource-arn
Pour plus d'informations sur le flux de données Kinesis ARNs, consultez Amazon Resource Names (ARNs) for Kinesis Data Streams dans le manuel Amazon Kinesis Developer Guide.
-
Nom du flux de données Kinesis ()
kinesis-stream-name
-
Région AWS (
)your-region
Pour en savoir plus sur l'obtention du nom de région de votre session en cours à l'aide du kit SDK pour Python (Boto3), consultez region_name
dans la documentation de Boto3.
from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "
kinesis-resource-arn
" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name
") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis..amazonaws.com") .load()
your-region
L'exemple suivant montre comment se connecter KinesisDataSource
à votre feature_processor
décorateur.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "
feature-group-arn
" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn
" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path
") .start() ) output_stream.awaitTermination()
Dans l'exemple de code ci-dessus, nous utilisons quelques options de diffusion structurée de Spark pour diffuser des microlots dans votre groupe de fonctionnalités. Pour une liste complète des options, consultez le guide de programmation en streaming structuré
-
Le mode
foreachBatch
récepteur est une fonctionnalité qui vous permet d'appliquer des opérations et d'écrire de la logique sur les données de sortie de chaque microlot d'une requête de streaming.Pour plus d'informations
foreachBatch
, consultez la section Utilisation de Foreach etle guide ForeachBatch de programmation de streaming structuré d'Apache Spark. -
L'
checkpointLocation
option enregistre régulièrement l'état de l'application de streaming. Le journal de diffusion est enregistré à l'emplacement
du point de contrôle.s3a://checkpoint-path
Pour plus d'informations sur
checkpointLocation
cette option, consultez la section Restaurer après un échec avec le pointage de contrôledans le guide de programmation de streaming structuré d'Apache Spark. -
Le
trigger
paramètre définit la fréquence à laquelle le traitement par microbatch est déclenché dans une application de streaming. Dans l'exemple, le type de déclencheur du temps de traitement est utilisé avec des intervalles de microlots d'une minute, spécifiés par.trigger(processingTime="1 minute")
Pour effectuer un remblayage à partir d'une source de flux, vous pouvez utiliser le type de déclencheur available-now, spécifié par.trigger(availableNow=True)
Pour une liste complète des
trigger
types, consultez la section Déclencheursdu guide de programmation de streaming structuré d'Apache Spark.
Streaming continu et tentatives automatiques à l'aide de déclencheurs basés sur des événements
Le Feature Processor utilise la SageMaker formation comme infrastructure de calcul et sa durée d'exécution maximale est de 28 jours. Vous pouvez utiliser des déclencheurs basés sur des événements pour prolonger votre diffusion continue sur une plus longue période et vous remettre en cas de défaillance passagère. Pour plus d'informations sur les exécutions basées sur le calendrier et les événements, consultezExécutions planifiées et basées sur des événements pour les pipelines de processeurs de fonctionnalités.
Voici un exemple de configuration d'un déclencheur basé sur un événement pour assurer le fonctionnement continu du pipeline du processeur de fonctionnalités de streaming. Cela utilise la fonction de transformation en continu définie dans l'exemple précédent. Un pipeline cible peut être configuré pour être déclenché lorsqu'un FAILED
événement STOPPED
ou se produit pour l'exécution d'un pipeline source. Notez que le même pipeline est utilisé comme source et cible afin qu'il fonctionne en continu.
import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
streaming-pipeline
" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )