Custom data source examples - Amazon SageMaker AI

Custom data source examples

This section provides examples of custom data sources implementations for Feature Processors. For more information on custom data sources, see Custom data sources.

Security is a shared responsibility between AWS and our customers. AWS is responsible for protecting the infrastructure that runs the services in the AWS Cloud. Customers are responsible for all of their necessary security configuration and management tasks. For example, secrets such as access credentials to data stores should not be hard coded in your custom data sources. You can use AWS Secrets Manager to manage these credentials. For information about Secrets Manager, see What is AWS Secrets Manager? in the AWS Secrets Manager user guide. The following examples will use Secrets Manager for your credentials.

Amazon Redshift Clusters (JDBC) custom data source examples

Amazon Redshift offers a JDBC driver that can be used to read data with Spark. For information about how to download the Amazon Redshift JDBC driver, see Download the Amazon Redshift JDBC driver, version 2.1.

To create the custom Amazon Redshift data source class, you will need to overwrite the read_data method from the Custom data sources.

To connect with an Amazon Redshift cluster you need your:

  • Amazon Redshift JDBC URL (jdbc-url)

    For information about obtaining your Amazon Redshift JDBC URL, see Getting the JDBC URL in the Amazon Redshift Database Developer Guide.

  • Amazon Redshift user name (redshift-user) and password (redshift-password)

    For information about how to create and manage database users using the Amazon Redshift SQL commands, see Users in the Amazon Redshift Database Developer Guide.

  • Amazon Redshift table name (redshift-table-name)

    For information about how to create a table with some examples, see CREATE TABLE in the Amazon Redshift Database Developer Guide.

  • (Optional) If using Secrets Manager, you’ll need the secret name (secret-redshift-account-info) where you store your Amazon Redshift access username and password on Secrets Manager.

    For information about Secrets Manager, see Find secrets in AWS Secrets Manager in the AWS Secrets Manager User Guide.

  • AWS Region (your-region)

    For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name in the Boto3 documentation.

The following example demonstrates how to retrieve the JDBC URL and personal access token from Secrets Manager and override the read_data for your custom data source class, DatabricksDataSource.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "redshift-resource-arn" def read_data(self, spark, params): url = "jdbc-url?user=redshift-user&password=redshift-password" aws_iam_role_arn = "redshift-command-access-role" secret_name = "secret-redshift-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url", secrets["jdbcurl"]).replace("redshift-user", secrets['username']).replace("redshift-password", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name") \ .option("tempdir", "s3a://your-bucket-name/your-bucket-prefix") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()

The following example shows how to connect RedshiftDataSource to your feature_processor decorator.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df

To run the feature processor job remotely, you need to provide the jdbc driver by defining SparkConfig and pass it to the @remote decorator.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Snowflake custom data source examples

Snowflake provides a Spark connector that can be used for your feature_processor decorator. For information about Snowflake connector for Spark, see Snowflake Connector for Spark in the Snowflake documentation.

To create the custom Snowflake data source class, you will need to override the read_data method from the Custom data sources and add the Spark connector packages to the Spark classpath.

To connect with a Snowflake data source you need:

  • Snowflake URL (sf-url)

    For information about URLs for accessing Snowflake web interfaces, see Account Identifiers in the Snowflake documentation.

  • Snowflake database (sf-database)

    For information about obtaining the name of your database using Snowflake, see CURRENT_DATABASE in the Snowflake documentation.

  • Snowflake database schema (sf-schema)

    For information about obtaining the name of your schema using Snowflake, see CURRENT_SCHEMA in the Snowflake documentation.

  • Snowflake warehouse (sf-warehouse)

    For information about obtaining the name of your warehouse using Snowflake, see CURRENT_WAREHOUSE in the Snowflake documentation.

  • Snowflake table name (sf-table-name)

  • (Optional) If using Secrets Manager, you’ll need the secret name (secret-snowflake-account-info) where you store your Snowflake access username and password on Secrets Manager.

    For information about Secrets Manager, see Find secrets in AWS Secrets Manager in the AWS Secrets Manager User Guide.

  • AWS Region (your-region)

    For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name in the Boto3 documentation.

The following example demonstrates how to retrieve the Snowflake user name and password from Secrets Manager and override the read_data function for your custom data source class SnowflakeDataSource.

from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "sf-url", "sfDatabase" : "sf-database", "sfSchema" : "sf-schema", "sfWarehouse" : "sf-warehouse", } data_source_name = "Snowflake" data_source_unique_id = "sf-url" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name") \ .load()

The following example shows how to connect SnowflakeDataSource to your feature_processor decorator.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df

To run the feature processor job remotely, you need to provide the packages via defining SparkConfig and pass it to @remote decorator. The Spark packages in the following example are such that spark-snowflake_2.12 is the Feature Processor Scala version, 2.12.0 is the Snowflake version you wish to use, and spark_3.3 is the Feature Processor Spark version.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="feature-group-arn>", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Databricks (JDBC) custom data source examples

Spark can read data from Databricks by using the Databricks JDBC driver. For information about the Databricks JDBC driver, see Configure the Databricks ODBC and JDBC drivers in the Databricks documentation.

Note

You can read data from any other database by including the corresponding JDBC driver in Spark classpath. For more information, see JDBC To Other Databases in the Spark SQL Guide.

To create the custom Databricks data source class, you will need to override the read_data method from the Custom data sources and add the JDBC jar to the Spark classpath.

To connect with a Databricks data source you need:

  • Databricks URL (databricks-url)

    For information about your Databricks URL, see Building the connection URL for the Databricks driver in the Databricks documentation.

  • Databricks personal access token (personal-access-token)

    For information about your Databricks access token, see Databricks personal access token authentication in the Databricks documentation.

  • Data catalog name (db-catalog)

    For information about your Databricks catalog name, see Catalog name in the Databricks documentation.

  • Schema name (db-schema)

    For information about your Databricks schema name, see Schema name in the Databricks documentation.

  • Table name (db-table-name)

    For information about your Databricks table name, see Table name in the Databricks documentation.

  • (Optional) If using Secrets Manager, you’ll need the secret name (secret-databricks-account-info) where you store your Databricks access username and password on Secrets Manager.

    For information about Secrets Manager, see Find secrets in AWS Secrets Manager in the AWS Secrets Manager User Guide.

  • AWS Region (your-region)

    For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name in the Boto3 documentation.

The following example demonstrates how to retrieve the JDBC URL and personal access token from Secrets Manager and overwrite the read_data for your custom data source class, DatabricksDataSource.

from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "databricks-url" def read_data(self, spark, params): secret_name = "secret-databricks-account-info" region_name = "your-region" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog`.`db-schema`.`db-table-name`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()

The following example shows how to upload the JDBC driver jar, jdbc-jar-file-name.jar, to Amazon S3 in order to add it to the Spark classpath. For information about downloading the Spark JDBC driver (jdbc-jar-file-name.jar) from Databricks, see Download JDBC Driverin the Databricks website.

from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=feature-group-arn, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar"} ) def transform(input_df): return input_df

To run the feature processor job remotely, you need to provide the jars by defining SparkConfig and pass it to the @remote decorator.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://your-bucket-name/your-bucket-prefix/jdbc-jar-file-name.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn", target_stores=["OfflineStore"], ) def transform(input_df): return input_df

Streaming custom data source examples

You can connect to streaming data sources like Amazon Kinesis, and author transforms with Spark Structured Streaming to read from streaming data sources. For information about the Kinesis connector, see Kinesis Connector for Spark Structured Streaming in GitHub. For information about Amazon Kinesis, see What Is Amazon Kinesis Data Streams? in the Amazon Kinesis Developer Guide.

To create the custom Amazon Kinesis data source class, you will need to extend the BaseDataSource class and override the read_data method from Custom data sources.

To connect to an Amazon Kinesis data stream you need:

  • Kinesis ARN (kinesis-resource-arn)

    For information on Kinesis data stream ARNs, see Amazon Resource Names (ARNs) for Kinesis Data Streams in the Amazon Kinesis Developer Guide.

  • Kinesis data stream name (kinesis-stream-name)

  • AWS Region (your-region)

    For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name in the Boto3 documentation.

from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "kinesis-resource-arn" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis.your-region.amazonaws.com") .load()

The following example demonstrates how to connect KinesisDataSource to your feature_processor decorator.

from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "feature-group-arn" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path") .start() ) output_stream.awaitTermination()

In the example code above we use a few Spark Structured Streaming options while streaming micro-batches into your feature group. For a full list of options, see the Structured Streaming Programming Guide in the Apache Spark documentation.

  • The foreachBatch sink mode is a feature that allows you to apply operations and write logic on the output data of each micro-batch of a streaming query.

    For information on foreachBatch, see Using Foreach and ForeachBatch in the Apache Spark Structured Streaming Programming Guide.

  • The checkpointLocation option periodically saves the state of the streaming application. The streaming log is saved in checkpoint location s3a://checkpoint-path.

    For information on the checkpointLocation option, see Recovering from Failures with Checkpointing in the Apache Spark Structured Streaming Programming Guide.

  • The trigger setting defines how often the micro-batch processing is triggered in a streaming application. In the example, the processing time trigger type is used with one-minute micro-batch intervals, specified by trigger(processingTime="1 minute"). To backfill from a stream source, you can use the available-now trigger type, specified by trigger(availableNow=True).

    For a full list of trigger types, see Triggers in the Apache Spark Structured Streaming Programming Guide.

Continuous streaming and automatic retries using event based triggers

The Feature Processor uses SageMaker Training as compute infrastructure and it has a maximum runtime limit of 28 days. You can use event based triggers to extend your continuous streaming for a longer period of time and recover from transient failures. For more information on schedule and event based executions, see Scheduled and event based executions for Feature Processor pipelines.

The following is an example of setting up an event based trigger to keep the streaming Feature Processor pipeline running continuously. This uses the streaming transform function defined in the previous example. A target pipeline can be configured to be triggered when a STOPPED or FAILED event occurs for a source pipeline execution. Note that the same pipeline is used as the source and target so that it run continuously.

import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "streaming-pipeline" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )