Wählen Sie Ihre Cookie-Einstellungen aus

Wir verwenden essentielle Cookies und ähnliche Tools, die für die Bereitstellung unserer Website und Services erforderlich sind. Wir verwenden Performance-Cookies, um anonyme Statistiken zu sammeln, damit wir verstehen können, wie Kunden unsere Website nutzen, und Verbesserungen vornehmen können. Essentielle Cookies können nicht deaktiviert werden, aber Sie können auf „Anpassen“ oder „Ablehnen“ klicken, um Performance-Cookies abzulehnen.

Wenn Sie damit einverstanden sind, verwenden AWS und zugelassene Drittanbieter auch Cookies, um nützliche Features der Website bereitzustellen, Ihre Präferenzen zu speichern und relevante Inhalte, einschließlich relevanter Werbung, anzuzeigen. Um alle nicht notwendigen Cookies zu akzeptieren oder abzulehnen, klicken Sie auf „Akzeptieren“ oder „Ablehnen“. Um detailliertere Entscheidungen zu treffen, klicken Sie auf „Anpassen“.

Beispiele für benutzerdefinierte Datenquellen

Fokusmodus
Beispiele für benutzerdefinierte Datenquellen - Amazon SageMaker KI

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Die vorliegende Übersetzung wurde maschinell erstellt. Im Falle eines Konflikts oder eines Widerspruchs zwischen dieser übersetzten Fassung und der englischen Fassung (einschließlich infolge von Verzögerungen bei der Übersetzung) ist die englische Fassung maßgeblich.

Dieser Abschnitt enthält Beispiele für Implementierungen benutzerdefinierter Datenquellen für Feature-Prozessoren. Weitere Informationen zu gemeinsamen Datenquellen finden Sie unter Benutzerdefinierte Datenquellen.

Sicherheit ist eine gemeinsame Verantwortung zwischen unseren Kunden AWS und unseren Kunden. AWS ist verantwortlich für den Schutz der Infrastruktur, auf der die Dienste in der ausgeführt AWS Cloud werden. Kunden sind für alle erforderlichen Sicherheitskonfigurations- und Verwaltungsaufgaben verantwortlich. Beispielsweise sollten Geheimnisse wie Zugangsdaten für Datenspeicher in Ihren benutzerdefinierten Datenquellen nicht fest codiert sein. Sie können diese Anmeldeinformationen AWS Secrets Manager zur Verwaltung verwenden. Informationen zu Secrets Manager finden Sie unter Was ist AWS Secrets Manager? im AWS Secrets Manager Benutzerhandbuch. In den folgenden Beispielen wird Secrets Manager für Ihre Anmeldeinformationen verwendet.

Beispiele für benutzerdefinierte Amazon Redshift Clusters (JDBC)-Datenquellen

Amazon Redshift bietet einen JDBC-Treiber, der zum Lesen von Daten mit Spark verwendet werden kann. Informationen zum Herunterladen des Amazon Redshift JDBC-Treibers finden Sie unter Herunterladen des Amazon Redshift JDBC-Treibers, Version 2.1.

Um die benutzerdefinierte Amazon Redshift Redshift-Datenquellenklasse zu erstellen, müssen Sie die read_data Methode aus der Benutzerdefinierte Datenquellen überschreiben.

Um eine Verbindung mit einem Amazon Redshift Redshift-Cluster herzustellen, benötigen Sie:

  • Amazon Redshift JDBC-URL (jdbc-url)

    Informationen zum Abrufen Ihrer Amazon Redshift JDBC-URL finden Sie unter Getting the JDBC URL im Datenbankentwicklerhandbuch zu Amazon Redshift.

  • Amazon Redshift Redshift-Benutzername (redshift-user) und Passwort (redshift-password)

    Informationen zum Erstellen und Verwalten von Datenbankbenutzern mithilfe der Amazon-Redshift-SQL-Befehle finden Sie unter Benutzer im Amazon-Redshift-Dabase-Entwicklerhandbuch.

  • Name der Amazon-Redshift-Tabelle (redshift-table-name)

    Informationen zum Erstellen einer Tabelle mit einigen Beispielen finden Sie unter CREATE TABLE im Datenbankentwicklerhandbuch zu Amazon Redshift.

  • (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (secret-redshift-account-info), unter dem Sie Ihren Amazon Redshift Redshift-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern.

    Informationen zu Secrets Manager finden Sie unter Find Secrets AWS Secrets Manager im AWS Secrets Manager Benutzerhandbuch.

  • AWS-Region (your-region)

    Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die read_data für Ihre benutzerdefinierte Datenquellenklasse, DatabricksDataSource überschreiben.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "redshift-resource-arn" def read_data(self, spark, params): url = "jdbc-url?user=redshift-user&password=redshift-password" aws_iam_role_arn = "redshift-command-access-role" secret_name = "secret-redshift-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name") \ .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()

Das folgende Beispiel zeigt, wie Sie eine Verbindung RedshiftDataSource zu Ihrem feature_processor Dekorateur herstellen.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df

Um den Featureprozessor-Job remote auszuführen, müssen Sie den JDBC-Treiber bereitstellen, indem Sie ihn definieren SparkConfig und an den @remote Decorator übergeben.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Beispiele für benutzerdefinierte Snowflake-Datenquellen

Snowflake bietet einen Spark-Konnektor, der für Ihren feature_processor Dekorateur verwendet werden kann. Informationen zum Snowflake-Konnektor für Spark finden Sie unter Snowflake-Konnektor für Spark in der Snowflake-Dokumentation.

Um die benutzerdefinierte Snowflake-Datenquellenklasse zu erstellen, müssen Sie die read_data Methode aus dem Benutzerdefinierte Datenquellen überschreiben und die Spark-Connector-Pakete zum Spark-Klassenpfad hinzufügen.

Um eine Verbindung mit einer Snowflake-Datenquelle herzustellen, benötigen Sie:

  • Snowflake-URL (sf-url)

    Informationen URLs zum Zugriff auf Snowflake-Weboberflächen finden Sie unter Konto-Identifikatoren in der Snowflake-Dokumentation.

  • Snowflake-Datenbank (sf-database)

    Informationen zum Abrufen des Namens Ihrer Datenbank mit Snowflake finden Sie unter CURRENT_DATABASE in der Snowflake-Dokumentation.

  • Snowflake-Datenbankschema (sf-schema)

    Informationen zum Abrufen des Namens Ihres Schemas mithilfe von Snowflake finden Sie unter CURRENT_SCHEMA in der Snowflake-Dokumentation.

  • Snowflake-Warehouse (sf-warehouse)

    Informationen zum Abrufen des Namens Ihres Warehouse mithilfe von Snowflake finden Sie unter CURRENT_WAREHOUSE in der Snowflake-Dokumentation.

  • Name der Snowflake-Tabelle (sf-table-name)

  • (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (secret-snowflake-account-info), unter dem Sie Ihren Snowflake-Zugriffsbenutzernamen und Ihr Passwort in Secrets Manager speichern.

    Informationen zu Secrets Manager finden Sie unter Find Secrets AWS Secrets Manager im AWS Secrets Manager Benutzerhandbuch.

  • AWS-Region (your-region)

    Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie den Snowflake-Benutzernamen und das Kennwort aus Secrets Manager abrufen und die read_data Funktion für Ihre benutzerdefinierte Datenquellenklasse überschreiben. SnowflakeDataSource

from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "sf-url", "sfDatabase" : "sf-database", "sfSchema" : "sf-schema", "sfWarehouse" : "sf-warehouse", } data_source_name = "Snowflake" data_source_unique_id = "sf-url" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name") \ .load()

Das folgende Beispiel zeigt, wie Sie eine Verbindung SnowflakeDataSource zu Ihrem feature_processor Dekorateur herstellen.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df

Um den Feature-Prozessor-Job remote auszuführen, müssen Sie die Pakete per Definition SparkConfig bereitstellen und an den @remote Decorator übergeben. Bei den Spark-Paketen im folgenden Beispiel handelt es sich um die spark-snowflake_2.12 Feature-Prozessor Scala-Version, 2.12.0 um die Snowflake-Version, die Sie verwenden möchten, und spark_3.3 um die Feature-Prozessor Spark-Version.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="feature-group-arn>", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Beispiele für benutzerdefinierte Datenquellen von Databricks (JDBC)

Spark kann mithilfe des Databricks JDBC-Treibers Daten aus Databricks lesen. Informationen zum Databricks-JDBC-Treiber finden Sie unter Konfigurieren der Databricks-ODBC- und JDBC-Treiber in der Databricks-Dokumentation.

Anmerkung

Sie können Daten aus jeder anderen Datenbank lesen, indem Sie den entsprechenden JDBC-Treiber in den Spark-Klassenpfad aufnehmen. Weitere Informationen finden Sie unter JDBC in anderen Datenbanken in Apache Spark SQL.

Um die benutzerdefinierte Databricks-Datenquellenklasse zu erstellen, müssen Sie die read_data Methode aus dem überschreiben Benutzerdefinierte Datenquellen und das JDBC-JAR zum Spark-Klassenpfad hinzufügen.

Um eine Verbindung mit einer Databricks-Datenquelle herzustellen, benötigen Sie:

  • Databricks-URL (databricks-url)

    Informationen zu Ihrer Databricks-URL finden Sie unter Erstellen der Verbindungs-URL für den Databricks-Treiber in der Databricks-Dokumentation.

  • Persönliches Zugriffstoken von Databricks (personal-access-token)

    Informationen zu Ihrem Databricks-Zugriffstoken finden Sie unter Authentifizierung mit dem persönlichen Zugriffstoken von Databricks in der Databricks-Dokumentation.

  • Name des Datenkatalogs (db-catalog)

    Informationen zu Ihrem Databricks-Katalognamen finden Sie unter Katalogname in der Databricks-Dokumentation.

  • Schemaname (db-schema)

    Informationen zu Ihrem Databricks-Schemanamen finden Sie unter Schemaname in der Databricks-Dokumentation.

  • Tabellenname (db-table-name)

    Informationen zu Ihrem Databricks-Tabellennamen finden Sie unter Tabellenname in derDatabricks-Dokumentation.

  • (Optional) Wenn Sie Secrets Manager verwenden, benötigen Sie den geheimen Namen (secret-databricks-account-info), unter dem Sie Ihren Databricks-Zugangsbenutzernamen und Ihr Passwort auf Secrets Manager speichern.

    Informationen zu Secrets Manager finden Sie unter Find Secrets AWS Secrets Manager im AWS Secrets Manager Benutzerhandbuch.

  • AWS-Region (your-region)

    Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name in der Boto3-Dokumentation.

Das folgende Beispiel zeigt, wie Sie die JDBC-URL und das persönliche Zugriffstoken aus Secrets Manager abrufen und die read_data für Ihre benutzerdefinierte Datenquellenklasse, DatabricksDataSource überschreiben.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "databricks-url" def read_data(self, spark, params): secret_name = "secret-databricks-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()

Das folgende Beispiel zeigt, wie der JDBC-Treiber jar, jdbc-jar-file-name.jar, auf Amazon S3 hochgeladen wird, um ihn dem Spark-Klassenpfad hinzuzufügen. Informationen zum Herunterladen des Spark-JDBC-Treibers (jdbc-jar-file-name.jar) von Databricks finden Sie unter JDBC-Treiber herunterladen auf der Databricks-Website.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"} ) def transform(input_df): return input_df

Um den Featureprozessor-Job remote auszuführen, müssen Sie die JAR-Dateien SparkConfig durch Definition bereitstellen und an den @remote Decorator übergeben.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Beispiele für das Streamen benutzerdefinierter Datenquellen

Sie können eine Verbindung zu Streaming-Datenquellen wie Amazon Kinesis herstellen und Transformationen mit Spark Structured Streaming erstellen, um aus Streaming-Datenquellen zu lesen. Informationen zum Kinesis-Konnektor finden Sie unter Kinesis-Konnektor für Spark Structured Streaming in. GitHub Weitere Informationen finden Sie unter Was ist Amazon Kinesis Data Streams? im Entwicklerhandbuch für Amazon Kinesis Data Streams.

Um die benutzerdefinierte Amazon Kinesis Kinesis-Datenquellenklasse zu erstellen, müssen Sie die BaseDataSource Klasse erweitern und die read_data Methode von Benutzerdefinierte Datenquellen überschreiben.

Zur Herstellung einer Verbindung mit einem Amazon Kinesis Kinesis-Daten-Stream benötigen Sie:

  • Kinesis ARN (kinesis-resource-arn)

    Informationen zu Kinesis Data Stream ARNs finden Sie unter Amazon Resource Names (ARNs) for Kinesis Data Streams im Amazon Kinesis Developer Guide.

  • Kinesis-Datenstreamname (kinesis-stream-name)

  • AWS-Region (your-region)

    Informationen zum Abrufen des Regionsnamens Ihrer aktuellen Sitzung mithilfe des SDK for Python (Boto3) finden Sie unter region_name in der Boto3-Dokumentation.

from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "kinesis-resource-arn" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis.your-region.amazonaws.com") .load()

Das folgende Beispiel zeigt, wie Sie eine Verbindung KinesisDataSource zu Ihrem feature_processor Dekorateur herstellen.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "feature-group-arn" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path") .start() ) output_stream.awaitTermination()

Im obigen Beispielcode verwenden wir einige Spark-Optionen für strukturiertes Streaming, während wir Mikrobatches in Ihre Feature-Gruppe streamen. Eine vollständige Liste der Optionen finden Sie im Leitfaden zur Programmierung von strukturiertem Streaming in der Apache-Spark-Dokumentation.

  • Der foreachBatch Sink-Modus ist eine Funktion, mit der Sie Operationen und Schreiblogik auf die Ausgabedaten jedes Mikrobatches einer Streaming-Abfrage anwenden können.

    Informationen dazu finden Sie unter foreachBatch Using Foreach und ForeachBatch im Apache Spark Structured Streaming Programming Guide.

  • Die checkpointLocation-Option speichert regelmäßig den Status der Streaming-Anwendung. Das Streaming-Protokoll wird am Checkpoint gespeichert s3a://checkpoint-path.

    Informationen zu dieser checkpointLocation Option finden Sie unter Wiederherstellung nach Fehlern mit Checkpointing in der strukturierten Streaming-Programmierung von Apache Spark.

  • Die trigger Einstellung definiert, wie oft die Mikro-Batch-Verarbeitung in einer Streaming-Anwendung ausgelöst wird. In diesem Beispiel wird der Triggertyp „Verarbeitungszeit“ mit Mikrobatch-Intervallen von einer Minute verwendet, die von trigger(processingTime="1 minute") spezifiziert sind. Für das Backfill aus einer Stream-Quelle können Sie den Triggertyp Available-now verwenden, der von trigger(availableNow=True) spezifiziert ist.

    Eine vollständige Liste der trigger-Typen finden Sie unter Trigger in der strukturierten Streaming-Programmierung von Apache Spark.

Kontinuierliches Streaming und automatische Wiederholungsversuche mit ereignisbasierten Triggern

Der Feature Processor verwendet SageMaker Training als Recheninfrastruktur und hat eine maximale Laufzeit von 28 Tagen. Sie können ereignisbasierte Trigger verwenden, um Ihr kontinuierliches Streaming über einen längeren Zeitraum zu verlängern und vorübergehende Ausfälle zu beheben. Weitere Informationen zu zeitplan- und ereignisbasierten Ausführungen finden Sie unter Geplante und ereignisbasierte Ausführungen für Feature-Prozessor-Pipelines.

Im Folgenden finden Sie ein Beispiel für die Einrichtung eines ereignisbasierten Triggers, um die Streaming-Featureprozessor-Pipeline kontinuierlich am Laufen zu halten. Dabei wird die im vorherigen Beispiel definierte Streaming-Transformationsfunktion verwendet. Eine Ziel-Pipeline kann so konfiguriert werden, dass sie ausgelöst wird, wenn bei der STOPPED Ausführung oder FAILED Quellpipeline-Ereignis eintritt. Beachten Sie, dass dieselbe Pipeline als Quelle und Ziel verwendet wird, sodass sie kontinuierlich ausgeführt wird.

import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "streaming-pipeline" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )
DatenschutzNutzungsbedingungen für die WebsiteCookie-Einstellungen
© 2025, Amazon Web Services, Inc. oder Tochtergesellschaften. Alle Rechte vorbehalten.