Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.
Dieser Abschnitt enthält Beispiele für Implementierungen benutzerdefinierter Datenquellen für Feature-Prozessoren. Weitere Informationen zu gemeinsamen Datenquellen finden Sie unter Benutzerdefinierte Datenquellen.
Sicherheit ist eine gemeinsame Verantwortung zwischen unseren Kunden AWS und unseren Kunden. AWS ist verantwortlich für den Schutz der Infrastruktur, auf der die Dienste in der ausgeführt AWS Cloud werden. Kunden sind für alle erforderlichen Sicherheitskonfigurations- und Verwaltungsaufgaben verantwortlich. Beispielsweise sollten Geheimnisse wie Zugangsdaten für Datenspeicher in Ihren benutzerdefinierten Datenquellen nicht fest codiert sein. Sie können diese Anmeldeinformationen AWS Secrets Manager zur Verwaltung verwenden. Informationen zu Secrets Manager finden Sie unter Was ist AWS Secrets Manager? im AWS Secrets Manager Benutzerhandbuch. In den folgenden Beispielen wird Secrets Manager für Ihre Anmeldeinformationen verwendet.
Themen
Beispiele für benutzerdefinierte Amazon Redshift Clusters (JDBC)-Datenquellen
Amazon Redshift bietet einen JDBC-Treiber, der zum Lesen von Daten mit Spark verwendet werden kann. Informationen zum Herunterladen des Amazon Redshift JDBC-Treibers finden Sie unter Herunterladen des Amazon Redshift JDBC-Treibers, Version 2.1.
Um die benutzerdefinierte Amazon Redshift Redshift-Datenquellenklasse zu erstellen, müssen Sie die read_data
Methode aus der Benutzerdefinierte Datenquellen überschreiben.
Um eine Verbindung mit einem Amazon Redshift Redshift-Cluster herzustellen, benötigen Sie:
-
Amazon Redshift JDBC-URL (
)jdbc-url
Informationen zum Abrufen Ihrer Amazon Redshift JDBC-URL finden Sie unter Getting the JDBC URL im Datenbankentwicklerhandbuch zu Amazon Redshift.
-
Amazon Redshift Redshift-Benutzername (
) und Passwort (redshift-user
)redshift-password
Informationen zum Erstellen und Verwalten von Datenbankbenutzern mithilfe der Amazon-Redshift-SQL-Befehle finden Sie unter Benutzer im Amazon-Redshift-Dabase-Entwicklerhandbuch.
-
Name der Amazon-Redshift-Tabelle (
)redshift-table-name
Informationen zum Erstellen einer Tabelle mit einigen Beispielen finden Sie unter CREATE TABLE im Datenbankentwicklerhandbuch zu Amazon Redshift.
-
(Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (
), unter dem Sie Ihren Amazon Redshift Redshift-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern.secret-redshift-account-info
Informationen zu Secrets Manager finden Sie unter Find Secrets AWS Secrets Manager im AWS Secrets Manager Benutzerhandbuch.
-
AWS-Region (
)your-region
Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name
in der Boto3-Dokumentation.
Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die read_data
für Ihre benutzerdefinierte Datenquellenklasse, DatabricksDataSource
überschreiben.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "
redshift-resource-arn
" def read_data(self, spark, params): url = "jdbc-url
?user=redshift-user
&password=redshift-password
" aws_iam_role_arn = "redshift-command-access-role
" secret_name = "secret-redshift-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url
", secrets["jdbcurl"]).replace("redshift-user
", secrets['username']).replace("redshift-password
", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name
") \ .option("tempdir", "s3a://your-bucket-name
/your-bucket-prefix
") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()
Das folgende Beispiel zeigt, wie Sie eine Verbindung RedshiftDataSource
zu Ihrem feature_processor
Dekorateur herstellen.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df
Um den Featureprozessor-Job remote auszuführen, müssen Sie den JDBC-Treiber bereitstellen, indem Sie ihn definieren SparkConfig
und an den @remote
Decorator übergeben.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Beispiele für benutzerdefinierte Snowflake-Datenquellen
Snowflake bietet einen Spark-Konnektor, der für Ihren feature_processor
Dekorateur verwendet werden kann. Informationen zum Snowflake-Konnektor für Spark finden Sie unter Snowflake-Konnektor für Spark in der Snowflake-Dokumentation
Um die benutzerdefinierte Snowflake-Datenquellenklasse zu erstellen, müssen Sie die read_data
Methode aus dem Benutzerdefinierte Datenquellen überschreiben und die Spark-Connector-Pakete zum Spark-Klassenpfad hinzufügen.
Um eine Verbindung mit einer Snowflake-Datenquelle herzustellen, benötigen Sie:
-
Snowflake-URL (
)sf-url
Informationen URLs zum Zugriff auf Snowflake-Weboberflächen finden Sie unter Konto-Identifikatoren
in der Snowflake-Dokumentation. -
Snowflake-Datenbank (
)sf-database
Informationen zum Abrufen des Namens Ihrer Datenbank mit Snowflake finden Sie unter CURRENT_DATABASE
in der Snowflake-Dokumentation. -
Snowflake-Datenbankschema (
)sf-schema
Informationen zum Abrufen des Namens Ihres Schemas mithilfe von Snowflake finden Sie unter CURRENT_SCHEMA
in der Snowflake-Dokumentation. -
Snowflake-Warehouse (
)sf-warehouse
Informationen zum Abrufen des Namens Ihres Warehouse mithilfe von Snowflake finden Sie unter CURRENT_WAREHOUSE
in der Snowflake-Dokumentation. -
Name der Snowflake-Tabelle (
)sf-table-name
-
(Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (
), unter dem Sie Ihren Snowflake-Zugriffsbenutzernamen und Ihr Passwort in Secrets Manager speichern.secret-snowflake-account-info
Informationen zu Secrets Manager finden Sie unter Find Secrets AWS Secrets Manager im AWS Secrets Manager Benutzerhandbuch.
-
AWS-Region (
)your-region
Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name
in der Boto3-Dokumentation.
Das folgende Beispiel zeigt, wie Sie den Snowflake-Benutzernamen und das Kennwort aus Secrets Manager abrufen und die read_data
Funktion für Ihre benutzerdefinierte Datenquellenklasse überschreiben. SnowflakeDataSource
from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "
sf-url
", "sfDatabase" : "sf-database
", "sfSchema" : "sf-schema
", "sfWarehouse" : "sf-warehouse
", } data_source_name = "Snowflake" data_source_unique_id = "sf-url
" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name
") \ .load()
Das folgende Beispiel zeigt, wie Sie eine Verbindung SnowflakeDataSource
zu Ihrem feature_processor
Dekorateur herstellen.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df
Um den Feature-Prozessor-Job remote auszuführen, müssen Sie die Pakete per Definition SparkConfig
bereitstellen und an den @remote
Decorator übergeben. Bei den Spark-Paketen im folgenden Beispiel handelt es sich um die spark-snowflake_2.12
Feature-Prozessor Scala-Version, 2.12.0
um die Snowflake-Version, die Sie verwenden möchten, und spark_3.3
um die Feature-Prozessor Spark-Version.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="
feature-group-arn>
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Beispiele für benutzerdefinierte Datenquellen von Databricks (JDBC)
Spark kann mithilfe des Databricks JDBC-Treibers Daten aus Databricks lesen. Informationen zum Databricks-JDBC-Treiber finden Sie unter Konfigurieren der Databricks-ODBC- und JDBC-Treiber in der
Anmerkung
Sie können Daten aus jeder anderen Datenbank lesen, indem Sie den entsprechenden JDBC-Treiber in den Spark-Klassenpfad aufnehmen. Weitere Informationen finden Sie unter JDBC in anderen Datenbanken
Um die benutzerdefinierte Databricks-Datenquellenklasse zu erstellen, müssen Sie die read_data
Methode aus dem überschreiben Benutzerdefinierte Datenquellen und das JDBC-JAR zum Spark-Klassenpfad hinzufügen.
Um eine Verbindung mit einer Databricks-Datenquelle herzustellen, benötigen Sie:
-
Databricks-URL (
)databricks-url
Informationen zu Ihrer Databricks-URL finden Sie unter Erstellen der Verbindungs-URL für den Databricks-Treiber
in der Databricks-Dokumentation. -
Persönliches Zugriffstoken von Databricks (
)personal-access-token
Informationen zu Ihrem Databricks-Zugriffstoken finden Sie unter Authentifizierung mit dem persönlichen Zugriffstoken von Databricks
in der Databricks-Dokumentation. -
Name des Datenkatalogs (
)db-catalog
Informationen zu Ihrem Databricks-Katalognamen finden Sie unter Katalogname
in der Databricks-Dokumentation. -
Schemaname (
)db-schema
Informationen zu Ihrem Databricks-Schemanamen finden Sie unter Schemaname in der Databricks-Dokumentation
. -
Tabellenname (
)db-table-name
Informationen zu Ihrem Databricks-Tabellennamen finden Sie unter Tabellenname
in derDatabricks-Dokumentation. -
(Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (
), unter dem Sie Ihren Databricks-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern.secret-databricks-account-info
Informationen zu Secrets Manager finden Sie unter Find Secrets AWS Secrets Manager im AWS Secrets Manager Benutzerhandbuch.
-
AWS-Region (
)your-region
Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name
in der Boto3-Dokumentation.
Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die read_data
für Ihre benutzerdefinierte Datenquellenklasse, DatabricksDataSource
überschreiben.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "
databricks-url
" def read_data(self, spark, params): secret_name = "secret-databricks-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token
", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog
`.`db-schema
`.`db-table-name
`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()
Das folgende Beispiel zeigt, wie der JDBC-Treiber jar,
, auf Amazon S3 hochgeladen wird, um ihn dem Spark-Klassenpfad hinzuzufügen. Informationen zum Herunterladen des Spark-JDBC-Treibers (jdbc-jar-file-name
.jar
) von Databricks finden Sie unter JDBC-Treiber herunterladenjdbc-jar-file-name
.jar
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar"} ) def transform(input_df): return input_df
Um den Featureprozessor-Job remote auszuführen, müssen Sie die JAR-Dateien SparkConfig
durch Definition bereitstellen und an den @remote
Decorator übergeben.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://
your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Beispiele für das Streamen benutzerdefinierter Datenquellen
Sie können eine Verbindung zu Streaming-Datenquellen wie Amazon Kinesis herstellen und Transformationen mit Spark Structured Streaming erstellen, um aus Streaming-Datenquellen zu lesen. Informationen zum Kinesis-Konnektor finden Sie unter Kinesis-Konnektor für Spark Structured Streaming
Um die benutzerdefinierte Amazon Kinesis Kinesis-Datenquellenklasse zu erstellen, müssen Sie die BaseDataSource
Klasse erweitern und die read_data
Methode von Benutzerdefinierte Datenquellen überschreiben.
Zur Herstellung einer Verbindung mit einem Amazon Kinesis Kinesis-Daten-Stream benötigen Sie:
-
Kinesis ARN (
)kinesis-resource-arn
Informationen zu Kinesis Data Stream ARNs finden Sie unter Amazon Resource Names (ARNs) for Kinesis Data Streams im Amazon Kinesis Developer Guide.
-
Kinesis-Datenstreamname (
)kinesis-stream-name
-
AWS-Region (
)your-region
Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name
in der Boto3-Dokumentation.
from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "
kinesis-resource-arn
" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name
") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis..amazonaws.com") .load()
your-region
Das folgende Beispiel zeigt, wie Sie eine Verbindung KinesisDataSource
zu Ihrem feature_processor
Dekorateur herstellen.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "
feature-group-arn
" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn
" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path
") .start() ) output_stream.awaitTermination()
Im obigen Beispielcode verwenden wir einige Spark-Optionen für strukturiertes Streaming, während wir Mikrobatches in Ihre Feature-Gruppe streamen. Eine vollständige Liste der Optionen finden Sie im Leitfaden zur Programmierung von strukturiertem Streaming
-
Der
foreachBatch
Sink-Modus ist eine Funktion, mit der Sie Operationen und Schreiblogik auf die Ausgabedaten jedes Mikrobatches einer Streaming-Abfrage anwenden können.Informationen dazu finden Sie unter
foreachBatch
Using Foreach und ForeachBatchim Apache Spark Structured Streaming Programming Guide. -
Die
checkpointLocation
-Option speichert regelmäßig den Status der Streaming-Anwendung. Das Streaming-Protokoll wird am Checkpoint gespeichert
.s3a://checkpoint-path
Informationen zu dieser
checkpointLocation
Option finden Sie unter Wiederherstellung nach Fehlern mit Checkpointingin der strukturierten Streaming-Programmierung von Apache Spark. -
Die
trigger
Einstellung definiert, wie oft die Mikro-Batch-Verarbeitung in einer Streaming-Anwendung ausgelöst wird. In diesem Beispiel wird der Triggertyp „Verarbeitungszeit“ mit Mikrobatch-Intervallen von einer Minute verwendet, die vontrigger(processingTime="1 minute")
spezifiziert sind. Für das Backfill aus einer Stream-Quelle können Sie den Triggertyp Available-now verwenden, der vontrigger(availableNow=True)
spezifiziert ist.Eine vollständige Liste der
trigger
-Typen finden Sie unter Triggerin der strukturierten Streaming-Programmierung von Apache Spark.
Kontinuierliches Streaming und automatische Wiederholungsversuche mit ereignisbasierten Triggern
Der Feature Processor verwendet SageMaker Training als Recheninfrastruktur und hat eine maximale Laufzeit von 28 Tagen. Sie können ereignisbasierte Trigger verwenden, um Ihr kontinuierliches Streaming über einen längeren Zeitraum zu verlängern und vorübergehende Ausfälle zu beheben. Weitere Informationen zu zeitplan- und ereignisbasierten Ausführungen finden Sie unter Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines.
Im Folgenden finden Sie ein Beispiel für die Einrichtung eines ereignisbasierten Triggers, um die Streaming-Featureprozessor-Pipeline kontinuierlich am Laufen zu halten. Dabei wird die im vorherigen Beispiel definierte Streaming-Transformationsfunktion verwendet. Eine Ziel-Pipeline kann so konfiguriert werden, dass sie ausgelöst wird, wenn bei der STOPPED
Ausführung oder FAILED
Quellpipeline-Ereignis eintritt. Beachten Sie, dass dieselbe Pipeline als Quelle und Ziel verwendet wird, sodass sie kontinuierlich ausgeführt wird.
import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
streaming-pipeline
" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )