Custom data source examples
This section provides examples of custom data sources implementations for Feature Processors. For more information on custom data sources, see Custom data sources.
Security is a shared responsibility between AWS and our customers. AWS is responsible for protecting the infrastructure that runs the services in the AWS Cloud. Customers are responsible for all of their necessary security configuration and management tasks. For example, secrets such as access credentials to data stores should not be hard coded in your custom data sources. You can use AWS Secrets Manager to manage these credentials. For information about Secrets Manager, see What is AWS Secrets Manager? in the AWS Secrets Manager user guide. The following examples will use Secrets Manager for your credentials.
Topics
Amazon Redshift Clusters (JDBC) custom data source examples
Amazon Redshift offers a JDBC driver that can be used to read data with Spark. For information about how to download the Amazon Redshift JDBC driver, see Download the Amazon Redshift JDBC driver, version 2.1.
To create the custom Amazon Redshift data source class, you will need to overwrite the
read_data
method from the Custom data
sources.
To connect with an Amazon Redshift cluster you need your:
-
Amazon Redshift JDBC URL (
)jdbc-url
For information about obtaining your Amazon Redshift JDBC URL, see Getting the JDBC URL in the Amazon Redshift Database Developer Guide.
-
Amazon Redshift user name (
) and password (redshift-user
)redshift-password
For information about how to create and manage database users using the Amazon Redshift SQL commands, see Users in the Amazon Redshift Database Developer Guide.
-
Amazon Redshift table name (
)redshift-table-name
For information about how to create a table with some examples, see CREATE TABLE in the Amazon Redshift Database Developer Guide.
-
(Optional) If using Secrets Manager, you’ll need the secret name (
) where you store your Amazon Redshift access username and password on Secrets Manager.secret-redshift-account-info
For information about Secrets Manager, see Find secrets in AWS Secrets Manager in the AWS Secrets Manager User Guide.
-
AWS Region (
)your-region
For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name
in the Boto3 documentation.
The following example demonstrates how to retrieve the JDBC URL and personal access token
from Secrets Manager and override the read_data
for your custom data source class,
DatabricksDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class RedshiftDataSource(PySparkDataSource): data_source_name = "Redshift" data_source_unique_id = "
redshift-resource-arn
" def read_data(self, spark, params): url = "jdbc-url
?user=redshift-user
&password=redshift-password
" aws_iam_role_arn = "redshift-command-access-role
" secret_name = "secret-redshift-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = url.replace("jdbc-url
", secrets["jdbcurl"]).replace("redshift-user
", secrets['username']).replace("redshift-password
", secrets['password']) return spark.read \ .format("jdbc") \ .option("url", url) \ .option("driver", "com.amazon.redshift.Driver") \ .option("dbtable", "redshift-table-name
") \ .option("tempdir", "s3a://your-bucket-name
/your-bucket-prefix
") \ .option("aws_iam_role", aws_iam_role_arn) \ .load()
The following example shows how to connect RedshiftDataSource
to your
feature_processor
decorator.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16"} ) def transform(input_df): return input_df
To run the feature processor job remotely, you need to provide the jdbc driver by defining
SparkConfig
and pass it to the @remote
decorator.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "com.amazon.redshift:redshift-jdbc42:2.1.0.16" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[RedshiftDataSource()], output="
feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Snowflake custom data source examples
Snowflake provides a Spark connector that can be used for your
feature_processor
decorator. For information about Snowflake connector for
Spark, see Snowflake
Connector for Spark
To create the custom Snowflake data source class, you will need to override the
read_data
method from the Custom data
sources and add the Spark
connector packages to the Spark classpath.
To connect with a Snowflake data source you need:
-
Snowflake URL (
)sf-url
For information about URLs for accessing Snowflake web interfaces, see Account Identifiers
in the Snowflake documentation. -
Snowflake database (
)sf-database
For information about obtaining the name of your database using Snowflake, see CURRENT_DATABASE
in the Snowflake documentation. -
Snowflake database schema (
)sf-schema
For information about obtaining the name of your schema using Snowflake, see CURRENT_SCHEMA
in the Snowflake documentation. -
Snowflake warehouse (
)sf-warehouse
For information about obtaining the name of your warehouse using Snowflake, see CURRENT_WAREHOUSE
in the Snowflake documentation. -
Snowflake table name (
)sf-table-name
-
(Optional) If using Secrets Manager, you’ll need the secret name (
) where you store your Snowflake access username and password on Secrets Manager.secret-snowflake-account-info
For information about Secrets Manager, see Find secrets in AWS Secrets Manager in the AWS Secrets Manager User Guide.
-
AWS Region (
)your-region
For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name
in the Boto3 documentation.
The following example demonstrates how to retrieve the Snowflake user name and password
from Secrets Manager and override the read_data
function for your custom data source class
SnowflakeDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource from sagemaker.feature_store.feature_processor import feature_processor import json import boto3 class SnowflakeDataSource(PySparkDataSource): sf_options = { "sfUrl" : "
sf-url
", "sfDatabase" : "sf-database
", "sfSchema" : "sf-schema
", "sfWarehouse" : "sf-warehouse
", } data_source_name = "Snowflake" data_source_unique_id = "sf-url
" def read_data(self, spark, params): secret_name = "secret-snowflake-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) self.sf_options["sfUser"] = secrets.get("username") self.sf_options["sfPassword"] = secrets.get("password") return spark.read.format("net.snowflake.spark.snowflake") \ .options(**self.sf_options) \ .option("dbtable", "sf-table-name
") \ .load()
The following example shows how to connect SnowflakeDataSource
to your
feature_processor
decorator.
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[SnowflakeDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3"} ) def transform(input_df): return input_df
To run the feature processor job remotely, you need to provide the packages via defining
SparkConfig
and pass it to @remote
decorator. The Spark packages
in the following example are such that spark-snowflake_2.12
is the Feature
Processor Scala version, 2.12.0
is the Snowflake version you wish to use, and
spark_3.3
is the Feature Processor Spark version.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars.packages": "net.snowflake:spark-snowflake_2.12:2.12.0-spark_3.3" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[SnowflakeDataSource()], output="
feature-group-arn>
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Databricks (JDBC) custom data source examples
Spark can read data from Databricks by using the Databricks JDBC driver. For information
about the Databricks JDBC driver, see Configure the Databricks ODBC and JDBC drivers
Note
You can read data from any other database by including the corresponding JDBC driver in
Spark classpath. For more information, see JDBC To Other
Databases
To create the custom Databricks data source class, you will need to override the
read_data
method from the Custom data
sources and add the JDBC jar
to the Spark classpath.
To connect with a Databricks data source you need:
-
Databricks URL (
)databricks-url
For information about your Databricks URL, see Building the connection URL for the Databricks driver
in the Databricks documentation. -
Databricks personal access token (
)personal-access-token
For information about your Databricks access token, see Databricks personal access token authentication
in the Databricks documentation. -
Data catalog name (
)db-catalog
For information about your Databricks catalog name, see Catalog name
in the Databricks documentation. -
Schema name (
)db-schema
For information about your Databricks schema name, see Schema name
in the Databricks documentation. -
Table name (
)db-table-name
For information about your Databricks table name, see Table name
in the Databricks documentation. -
(Optional) If using Secrets Manager, you’ll need the secret name (
) where you store your Databricks access username and password on Secrets Manager.secret-databricks-account-info
For information about Secrets Manager, see Find secrets in AWS Secrets Manager in the AWS Secrets Manager User Guide.
-
AWS Region (
)your-region
For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name
in the Boto3 documentation.
The following example demonstrates how to retrieve the JDBC URL and personal access token
from Secrets Manager and overwrite the read_data
for your custom data source class,
DatabricksDataSource
.
from sagemaker.feature_store.feature_processor import PySparkDataSource import json import boto3 class DatabricksDataSource(PySparkDataSource): data_source_name = "Databricks" data_source_unique_id = "
databricks-url
" def read_data(self, spark, params): secret_name = "secret-databricks-account-info
" region_name = "your-region
" session = boto3.session.Session() sm_client = session.client( service_name='secretsmanager', region_name=region_name, ) secrets = json.loads(sm_client.get_secret_value(SecretId=secret_name)["SecretString"]) jdbc_url = secrets["jdbcurl"].replace("personal-access-token
", secrets['pwd']) return spark.read.format("jdbc") \ .option("url", jdbc_url) \ .option("dbtable","`db-catalog
`.`db-schema
`.`db-table-name
`") \ .option("driver", "com.simba.spark.jdbc.Driver") \ .load()
The following example shows how to upload the JDBC driver jar,
, to Amazon S3 in order to add it
to the Spark classpath. For information about downloading the Spark JDBC driver
(jdbc-jar-file-name
.jar
) from Databricks, see
Download JDBC
Driverjdbc-jar-file-name
.jar
from sagemaker.feature_store.feature_processor import feature_processor @feature_processor( inputs=[DatabricksDataSource()], output=
feature-group-arn
, target_stores=["OfflineStore"], spark_config={"spark.jars": "s3://your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar"} ) def transform(input_df): return input_df
To run the feature processor job remotely, you need to provide the jars by defining
SparkConfig
and pass it to the @remote
decorator.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig config = { "Classification": "spark-defaults", "Properties": { "spark.jars": "s3://
your-bucket-name
/your-bucket-prefix
/jdbc-jar-file-name
.jar" } } @remote( spark_config=SparkConfig(configuration=config), instance_type="ml.m5.2xlarge", ) @feature_processor( inputs=[DatabricksDataSource()], output="feature-group-arn
", target_stores=["OfflineStore"], ) def transform(input_df): return input_df
Streaming custom data source examples
You can connect to streaming data sources like Amazon Kinesis, and author transforms with Spark
Structured Streaming to read from streaming data sources. For information about the Kinesis
connector, see Kinesis Connector for
Spark Structured Streaming
To create the custom Amazon Kinesis data source class, you will need to extend the
BaseDataSource
class and override the read_data
method from Custom data
sources.
To connect to an Amazon Kinesis data stream you need:
-
Kinesis ARN (
)kinesis-resource-arn
For information on Kinesis data stream ARNs, see Amazon Resource Names (ARNs) for Kinesis Data Streams in the Amazon Kinesis Developer Guide.
-
Kinesis data stream name (
)kinesis-stream-name
-
AWS Region (
)your-region
For information about obtaining your current session’s region name using SDK for Python (Boto3), see region_name
in the Boto3 documentation.
from sagemaker.feature_store.feature_processor import BaseDataSource from sagemaker.feature_store.feature_processor import feature_processor class KinesisDataSource(BaseDataSource): data_source_name = "Kinesis" data_source_unique_id = "
kinesis-resource-arn
" def read_data(self, spark, params): return spark.readStream.format("kinesis") \ .option("streamName", "kinesis-stream-name
") \ .option("awsUseInstanceProfile", "false") \ .option("endpointUrl", "https://kinesis..amazonaws.com") .load()
your-region
The following example demonstrates how to connect KinesisDataSource
to your
feature_processor
decorator.
from sagemaker.remote_function import remote from sagemaker.remote_function.spark_config import SparkConfig import feature_store_pyspark.FeatureStoreManager as fsm def ingest_micro_batch_into_fg(input_df, epoch_id): feature_group_arn = "
feature-group-arn
" fsm.FeatureStoreManager().ingest_data( input_data_frame = input_df, feature_group_arn = feature_group_arn ) @remote( spark_config=SparkConfig( configuration={ "Classification": "spark-defaults", "Properties":{ "spark.sql.streaming.schemaInference": "true", "spark.jars.packages": "com.roncemer.spark/spark-sql-kinesis_2.13/1.2.2_spark-3.2" } } ), instance_type="ml.m5.2xlarge", max_runtime_in_seconds=2419200 # 28 days ) @feature_processor( inputs=[KinesisDataSource()], output="feature-group-arn
" ) def transform(input_df): output_stream = ( input_df.selectExpr("CAST(rand() AS STRING) as partitionKey", "CAST(data AS STRING)") .writeStream.foreachBatch(ingest_micro_batch_into_fg) .trigger(processingTime="1 minute") .option("checkpointLocation", "s3a://checkpoint-path
") .start() ) output_stream.awaitTermination()
In the example code above we use a few Spark Structured Streaming options while streaming
micro-batches into your feature group. For a full list of options, see the Structured Streaming Programming Guide
-
The
foreachBatch
sink mode is a feature that allows you to apply operations and write logic on the output data of each micro-batch of a streaming query.For information on
foreachBatch
, see Using Foreach and ForeachBatchin the Apache Spark Structured Streaming Programming Guide. -
The
checkpointLocation
option periodically saves the state of the streaming application. The streaming log is saved in checkpoint location
.s3a://checkpoint-path
For information on the
checkpointLocation
option, see Recovering from Failures with Checkpointingin the Apache Spark Structured Streaming Programming Guide. -
The
trigger
setting defines how often the micro-batch processing is triggered in a streaming application. In the example, the processing time trigger type is used with one-minute micro-batch intervals, specified bytrigger(processingTime="1 minute")
. To backfill from a stream source, you can use the available-now trigger type, specified bytrigger(availableNow=True)
.For a full list of
trigger
types, see Triggersin the Apache Spark Structured Streaming Programming Guide.
Continuous streaming and automatic retries using event based triggers
The Feature Processor uses SageMaker Training as compute infrastructure and it has a maximum runtime limit of 28 days. You can use event based triggers to extend your continuous streaming for a longer period of time and recover from transient failures. For more information on schedule and event based executions, see Scheduled and event based executions for Feature Processor pipelines.
The following is an example of setting up an event based trigger to keep the streaming
Feature Processor pipeline running continuously. This uses the streaming transform function
defined in the previous example. A target pipeline can be configured to be triggered when a
STOPPED
or FAILED
event occurs for a source pipeline execution.
Note that the same pipeline is used as the source and target so that it run
continuously.
import sagemaker.feature_store.feature_processor as fp from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineEvent from sagemaker.feature_store.feature_processor import FeatureProcessorPipelineExecutionStatus streaming_pipeline_name = "
streaming-pipeline
" streaming_pipeline_arn = fp.to_pipeline( pipeline_name = streaming_pipeline_name, step = transform # defined in previous section ) fp.put_trigger( source_pipeline_events=FeatureProcessorPipelineEvents( pipeline_name=source_pipeline_name, pipeline_execution_status=[ FeatureProcessorPipelineExecutionStatus.STOPPED, FeatureProcessorPipelineExecutionStatus.FAILED] ), target_pipeline=target_pipeline_name )