Amazon SageMaker 피처 스토어 스파크를 사용한 Batch 통합 - 아마존 SageMaker

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon SageMaker 피처 스토어 스파크를 사용한 Batch 통합

Amazon SageMaker 피처 스토어 스파크는 Spark 라이브러리를 피처 스토어에 연결하는 Spark 커넥터입니다. 특성 저장소 Spark는 Spark DataFrame에서 특성 그룹으로의 데이터 수집을 단순화합니다. 피처 스토어는 Amazon EMR, GIS, 작업, Amazon 프로세싱 작업 AWS Glue 또는 노트북에서 기존 ETL 파이프라인을 사용하여 Spark를 통한 일괄 데이터 통합을 지원합니다. SageMaker SageMaker

Python 및 Scala 개발자를 위해 일괄 데이터 수집을 설치, 구현하는 방법이 제공됩니다. Python 개발자는 Amazon Feature Store Spark 리포지토리의 지침에 따라 로컬 개발, Amazon EMR에 설치 및 Jupyter 노트북에 오픈 소스 sagemaker-feature-store-pyspark Python 라이브러리를 사용할 수 있습니다. SageMaker GitHub Scala 개발자는 아마존 피처 스토어 Spark SDK Maven 중앙 리포지토리에서 제공되는 SageMaker 피처 스토어 스파크 커넥터를 사용할 수 있습니다.

온라인 저장소, 오프라인 저장소 또는 둘 다 활성화되었는지 여부에 따라 Spark 커넥터를 사용하여 다음과 같은 방법으로 데이터를 수집할 수 있습니다.

  1. 기본 인제스트 — 온라인 스토어가 활성화된 경우 Spark 커넥터는 먼저 API를 사용하여 데이터프레임을 온라인 스토어에 수집합니다. PutRecord 이벤트 시간이 가장 긴 레코드만 온라인 저장소에 남습니다. 오프라인 저장소가 활성화되면 15분 이내에 특성 저장소가 데이터프레임을 오프라인 저장소에 수집합니다. 온라인 및 오프라인 저장소의 작동 방식에 대한 자세한 내용은 특성 저장소 개념섹션을 참조하세요.

    .ingest_data(...) 메서드에서 target_stores를 지정하지 않고 이 작업을 수행할 수 있습니다.

  2. 오프라인 저장소 직접 수집 - 오프라인 저장소가 활성화된 경우 Spark 커넥터 배치는 데이터프레임을 오프라인 저장소에 직접 수집합니다. 데이터프레임을 오프라인 저장소에 직접 수집해도 온라인 저장소는 업데이트되지 않습니다.

    .ingest_data(...) 메서드에서 target_stores=["OfflineStore"]를 설정하여 이 작업을 수행할 수 있습니다.

  3. 온라인 스토어만 해당 — 온라인 스토어가 활성화된 경우 Spark 커넥터는 API를 사용하여 데이터프레임을 온라인 스토어에 수집합니다. PutRecord 데이터프레임을 온라인 저장소에 직접 수집해도 오프라인 저장소는 업데이트되지 않습니다.

    .ingest_data(...) 메서드에서 target_stores=["OnlineStore"]를 설정하여 이 작업을 수행할 수 있습니다.

다양한 수집 방법 사용에 대한 자세한 내용은 예제 구현섹션을 참조하세요.

특성 저장소 Spark 설치

Scala 사용자

피처 스토어 스파크 SDK는 스칼라 사용자를 위한 아마존 SageMaker 피처 스토어 Spark SDK Maven 중앙 리포지토리에서 사용할 수 있습니다.

요구 사항

  • Spark >= 3.0.0 및 <= 3.3.0

  • iceberg-spark-runtime >= 0.14.0

  • Scala >= 2.12.x 

  • Amazon EMR >= 6.1.0 (Amazon EMR을 사용하는 경우에만 해당)

POM.xml 파일에서의 종속성 선언

특성 저장소 Spark 커넥터는 iceberg-spark-runtime라이브러리에 종속되어 있습니다. 따라서 Iceberg 테이블 형식으로 자동 생성한 특성 그룹에 데이터를 수집하려면 해당 버전의 iceberg-spark-runtime라이브러리를 종속 항목에 추가해야 합니다. 예를 들어 Spark 3.1을 사용하는 경우 프로젝트의 POM.xml에서 다음을 선언해야 합니다.

<dependency> <groupId>software.amazon.sagemaker.featurestore</groupId> <artifactId>sagemaker-feature-store-spark-sdk_2.12</artifactId> <version>1.0.0</version> </dependency> <dependency> <groupId>org.apache.iceberg</groupId> <artifactId>iceberg-spark-runtime-3.1_2.12</artifactId> <version>0.14.0</version> </dependency>

Python 사용자

피처 스토어 스파크 SDK는 오픈 소스 SageMaker Amazon 피처 스토어 GitHub 스파크 리포지토리에서 사용할 수 있습니다.

요구 사항

  • Spark >= 3.0.0 및 <= 3.3.0

  • Amazon EMR >= 6.1.0 (Amazon EMR을 사용하는 경우에만 해당)

  • 커널 = conda_python3

Spark가 설치된 디렉터리로 $SPARK_HOME을 설정하는 것이 좋습니다. 설치 중에 특성 저장소는 필수 JAR을 SPARK_HOME에 업로드하여 종속 항목이 자동으로 로드되도록 합니다. 이 라이브러리가 작동하려면 Spark에서 JVM을 시작해야 합니다. PySpark

로컬 설치

설치에 대한 자세한 정보를 찾으려면 --verbose를 다음 설치 명령을 추가하여 상세 내용 표시 모드를 활성화하세요.

pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

Amazon EMR에 설치

릴리스 버전 6.1.0 이상을 사용하여 Amazon EMR 클러스터를 생성합니다. SSH를 활성화하면 문제를 해결하는 데 도움이 됩니다.

다음을 수행하여 라이브러리를 설치할 수 있습니다.

  • Amazon EMR 내에서 사용자 지정 단계를 생성합니다.

  • SSH를 사용하여 클러스터에 연결하고 그곳에 라이브러리를 설치합니다.

참고

다음 정보는 Spark 버전 3.1을 사용하지만 요구 사항을 충족하는 모든 버전을 지정할 수 있습니다.

export SPARK_HOME=/usr/lib/spark sudo -E pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose
참고

종속 JAR을 SPARK_HOME에 자동으로 설치하려면 부트스트랩 단계를 사용하지 마세요.

노트북 인스턴스에 설치 SageMaker

다음 명령을 사용하여 Spark 커넥터와 호환되는 버전을 설치합니다. PySpark

!pip3 install pyspark==3.1.1 !pip3 install sagemaker-feature-store-pyspark-3.1 --no-binary :all:

오프라인 저장소에 일괄 수집을 수행하는 경우 종속성은 노트북 인스턴스 환경 내에 있지 않습니다.

from pyspark.sql import SparkSession import feature_store_pyspark extra_jars = ",".join(feature_store_pyspark.classpath_jars()) spark = SparkSession.builder \ .config("spark.jars", extra_jars) \ .config("spark.jars.packages", "org.apache.hadoop:hadoop-aws:3.2.1,org.apache.hadoop:hadoop-common:3.2.1") \ .getOrCreate()

GIS를 사용하는 노트북에 설치

중요

AWS Glue 버전 2.0 이상을 사용해야 합니다.

다음 정보를 사용하면 AWS Glue 대화형 세션 (GIS) 에 PySpark 커넥터를 설치하는 데 도움이 됩니다.

Amazon SageMaker Feature Store Spark를 사용하려면 세션을 초기화하는 동안 Amazon S3 버킷에 업로드할 특정 Spark 커넥터 JAR이 필요합니다. S3 버킷에 필수 JAR을 업로드하는 방법에 대한 자세한 내용은 특성 저장소 Spark JAR 검색섹션을 참조하세요.

JAR을 업로드한 후에는 다음 명령을 사용하여 JAR과 함께 GIS 세션을 제공해야 합니다.

%extra_jars s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar

AWS Glue 런타임에 피처 스토어 스파크를 설치하려면 GIS 노트북에서 %additional_python_modules magic 명령을 사용하십시오. AWS Glue 에서 지정한 pip 모듈로 실행됩니다. %additional_python_modules

%additional_python_modules sagemaker-feature-store-pyspark-3.1

AWS Glue 세션을 시작하기 전에 위의 두 가지 매직 명령을 모두 사용해야 합니다.

작업 중 설치 AWS Glue

중요

AWS Glue 버전 2.0 이상을 사용해야 합니다.

AWS Glue 작업에 Spark 커넥터를 설치하려면 다음 예와 --additional-python-modules 같이 인수를 사용하여 필요한 JAR을 제공하고 작업을 생성할 때 Spark 커넥터를 AWS Glue 작업 매개 --extra-jars 변수로 설치합니다. S3 버킷에 필수 JAR을 업로드하는 방법에 대한 자세한 내용은 특성 저장소 Spark JAR 검색섹션을 참조하세요.

glue_client = boto3.client('glue', region_name=region) response = glue_client.create_job( Name=pipeline_id, Description='Feature Store Compute Job', Role=glue_role_arn, ExecutionProperty={'MaxConcurrentRuns': max_concurrent_run}, Command={ 'Name': 'glueetl', 'ScriptLocation': script_location_uri, 'PythonVersion': '3' }, DefaultArguments={ '--TempDir': temp_dir_location_uri, '--additional-python-modules': 'sagemaker-feature-store-pyspark-3.1', '--extra-jars': "s3:/<YOUR_BUCKET>/spark-connector-jars/sagemaker-feature-store-spark-sdk.jar", ... }, MaxRetries=3, NumberOfWorkers=149, Timeout=2880, GlueVersion='3.0', WorkerType='G.2X' )

Amazon SageMaker 프로세싱 작업에 설치

Feature Store Spark를 Amazon SageMaker 프로세싱 작업과 함께 사용하려면 자체 이미지를 가져오세요. 이미지 가져오기에 대한 자세한 내용은 자체 SageMaker 이미지를 가져오세요.섹션을 참조하세요. Dockerfile에 설치 단계를 추가합니다. Docker 이미지를 Amazon ECR 리포지토리로 푸시한 후 를 사용하여 처리 작업을 생성할 수 있습니다. PySparkProcessor PySpark 프로세서로 처리 작업을 생성하는 방법에 대한 자세한 내용은 을 참조하십시오. Apache Spark를 사용한 데이터 처리

다음은 Dockerfile에 설치 단계를 추가하는 예제입니다.

FROM <ACCOUNT_ID>.dkr.ecr.<AWS_REGION>.amazonaws.com/sagemaker-spark-processing:3.1-cpu-py38-v1.0 RUN /usr/bin/python3 -m pip install sagemaker-feature-store-pyspark-3.1 --no-binary :all: --verbose

특성 저장소 Spark JAR 검색

특성 저장소 Spark 종속성 JAR을 검색하려면 네트워크 액세스가 가능한 Python 환경에서 pip를 사용하여 Python Package Index (PyPI) 리포지토리에서 Spark 커넥터를 설치해야 합니다. SageMaker Jupyter 노트북은 네트워크 액세스가 가능한 Python 환경의 예입니다.

다음 명령은 Spark 커넥터를 설치합니다.

!pip install sagemaker-feature-store-pyspark-3.1

특성 저장소 Spark를 설치한 후 JAR 위치를 검색하고 JAR을 Amazon S3에 업로드할 수 있습니다.

feature-store-pyspark-dependency-jars명령은 특성 저장소 Spark가 추가한 필수 종속성 JAR의 위치를 제공합니다. 명령을 사용하여 JAR을 검색하고 이를 Amazon S3에 업로드할 수 있습니다.

jar_location = !feature-store-pyspark-dependency-jars jar_location = jar_location[0] s3_client = boto3.client("s3") s3_client.upload_file(jar_location, "<YOUR_BUCKET>","spark-connector-jars/sagemaker-feature-store-spark-sdk.jar")

예제 구현

Example Python script

FeatureStoreBatchIngestion.py

from pyspark.sql import SparkSession from feature_store_pyspark.FeatureStoreManager import FeatureStoreManager import feature_store_pyspark spark = SparkSession.builder \ .getOrCreate() # Construct test DataFrame columns = ["RecordIdentifier", "EventTime"] data = [("1","2021-03-02T12:20:12Z"), ("2", "2021-03-02T12:20:13Z"), ("3", "2021-03-02T12:20:14Z")] df = spark.createDataFrame(data).toDF(*columns) # Initialize FeatureStoreManager with a role arn if your feature group is created by another account feature_store_manager= FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") # Load the feature definitions from input schema. The feature definitions can be used to create a feature group feature_definitions = feature_store_manager.load_feature_definitions_from_schema(df) feature_group_arn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" # Ingest by default. The connector will leverage PutRecord API to ingest your data in stream # https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn) # To select the target stores for ingestion, you can specify the target store as the paramter # If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore", "OnlineStore"]) # If only OfflineStore is selected, the connector will batch write the data to offline store directly feature_store_manager.ingest_data(input_data_frame=df, feature_group_arn=feature_group_arn, target_stores=["OfflineStore"]) # To retrieve the records failed to be ingested by spark connector failed_records_df = feature_store_manager.get_failed_stream_ingestion_data_frame()

예제 Python 스크립트가 포함된 Spark 작업 제출

이 PySpark 버전에는 추가 종속 JAR을 가져와야 하므로 Spark 애플리케이션을 실행하려면 추가 단계가 필요합니다.

설치 중 SPARK_HOME을 지정하지 않은 경우 spark-submit실행 시 필요한 JAR을 JVM에 로드해야 합니다. feature-store-pyspark-dependency-jars는 모든 JAR의 경로를 자동으로 가져오기 위해 Spark 라이브러리에서 설치한 Python 스크립트입니다.

spark-submit --jars `feature-store-pyspark-dependency-jars` FeatureStoreBatchIngestion.py

Amazon EMR에서 이 애플리케이션을 실행하는 경우 종속 JAR을 다른 태스크 노드에 배포할 필요가 없도록 클라이언트 모드에서 애플리케이션을 실행하는 것이 좋습니다. Amazon EMR 클러스터에 다음과 비슷한 Spark 인수를 사용하여 단계를 하나 더 추가합니다.

spark-submit --deploy-mode client --master yarn s3:/<PATH_TO_SCRIPT>/FeatureStoreBatchIngestion.py
Example Scala script

FeatureStoreBatchIngestion.scala

import software.amazon.sagemaker.featurestore.sparksdk.FeatureStoreManager import org.apache.spark.sql.types.{StringType, StructField, StructType} import org.apache.spark.sql.{Row, SparkSession} object TestSparkApp { def main(args: Array[String]): Unit = { val spark = SparkSession.builder().getOrCreate() // Construct test DataFrame val data = List( Row("1", "2021-07-01T12:20:12Z"), Row("2", "2021-07-02T12:20:13Z"), Row("3", "2021-07-03T12:20:14Z") ) val schema = StructType( List(StructField("RecordIdentifier", StringType), StructField("EventTime", StringType)) ) val df = spark.createDataFrame(spark.sparkContext.parallelize(data), schema) // Initialize FeatureStoreManager with a role arn if your feature group is created by another account val featureStoreManager = new FeatureStoreManager("arn:aws:iam::111122223333:role/role-arn") // Load the feature definitions from input schema. The feature definitions can be used to create a feature group val featureDefinitions = featureStoreManager.loadFeatureDefinitionsFromSchema(df) val featureGroupArn = "arn:aws:sagemaker:<AWS_REGION>:<ACCOUNT_ID>:feature-group/<YOUR_FEATURE_GROUP_NAME>" // Ingest by default. The connector will leverage PutRecord API to ingest your data in stream // https://docs.aws.amazon.com/sagemaker/latest/APIReference/API_feature_store_PutRecord.html featureStoreManager.ingestData(df, featureGroupArn) // To select the target stores for ingestion, you can specify the target store as the paramter // If OnlineStore is selected, the connector will leverage PutRecord API to ingest your data in stream featureStoreManager.ingestData(df, featureGroupArn, List("OfflineStore", "OnlineStore")) // If only OfflineStore is selected, the connector will batch write the data to offline store directly featureStoreManager.ingestData(df, featureGroupArn, ["OfflineStore"]) // To retrieve the records failed to be ingested by spark connector val failedRecordsDf = featureStoreManager.getFailedStreamIngestionDataFrame() } }

Spark 작업 제출

Scala

특성 저장소 Spark를 일반 종속 항목으로 사용할 수 있어야 합니다. 모든 플랫폼에서 애플리케이션을 실행하는 데 추가 지침은 필요하지 않습니다.