EMR Serverless와 함께 델타 레이크 OSS 사용 - Amazon EMR

EMR Serverless와 함께 델타 레이크 OSS 사용

Amazon EMR 버전 6.9.0 이상

참고

Amazon EMR 7.0.0 이상은 Delta Lake 3.0.0(delta-core.jar 이름이 delta-spark.jar로 바뀜)을 사용합니다. Amazon EMR 7.0.0 이상을 사용하는 경우 구성에서 delta-spark.jar을 지정해야 합니다.

Amazon EMR 6.9.0 이상에는 Delta Lake가 포함되어 있으므로 더 이상 Delta Lake를 직접 패키지로 제공하거나 EMR Serverless 작업과 함께 --packages 플래그를 제공하지 않아도 됩니다.

  1. EMR Serverless 작업을 제출하는 경우 다음 구성 속성이 있고 sparkSubmitParameters 필드에 다음 파라미터를 포함하는지 확인합니다.

    --conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog
  2. 로컬 delta_sample.py를 생성하여 델타 테이블 생성 및 읽기를 테스트합니다.

    # delta_sample.py from pyspark.sql import SparkSession import uuid url = "s3://amzn-s3-demo-bucket/delta-lake/output/%s/" % str(uuid.uuid4()) spark = SparkSession.builder.appName("DeltaSample").getOrCreate() ## creates a Delta table and outputs to target S3 bucket spark.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket spark.read.format("delta").load(url).show
  3. AWS CLI를 사용하여 delta_sample.py 파일을 Amazon S3 버킷에 업로드합니다. 그런 다음, start-job-run 명령을 사용하여 기존 EMR Serverless 애플리케이션에 작업을 제출합니다.

    aws s3 cp delta_sample.py s3://amzn-s3-demo-bucket/code/ aws emr-serverless start-job-run \ --application-id application-id \ --execution-role-arn job-role-arn \ --name emr-delta \ --job-driver '{ "sparkSubmit": { "entryPoint": "s3://amzn-s3-demo-bucket/code/delta_sample.py", "sparkSubmitParameters": "--conf spark.jars=/usr/share/aws/delta/lib/delta-core.jar,/usr/share/aws/delta/lib/delta-storage.jar --conf spark.sql.extensions=io.delta.sql.DeltaSparkSessionExtension --conf spark.sql.catalog.spark_catalog=org.apache.spark.sql.delta.catalog.DeltaCatalog" } }'

Delta Lake와 함께 Python 라이브러리를 사용하려면 라이브러리를 종속 항목으로 패키징하거나 사용자 지정 이미지로 사용하여 delta-core 라이브러리를 추가할 수 있습니다.

또는 SparkContext.addPyFile을 사용하여 delta-core JAR 파일에서 Python 라이브러리를 추가할 수 있습니다.

import glob from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() spark.sparkContext.addPyFile(glob.glob("/usr/share/aws/delta/lib/delta-core_*.jar")[0])

Amazon EMR 버전 6.8.0 이하

Amazon EMR 6.8.0 이하를 사용하는 경우 다음 단계를 수행하여 EMR 서버리스 애플리케이션에서 Delta Lake OSS를 사용합니다.

  1. Amazon EMR Serverless 애플리케이션의 Spark 버전과 호환되는 오픈 소스 버전의 Delta Lake를 빌드하려면 Delta GitHub로 이동한 후 지침을 따릅니다.

  2. Delta Lake 라이브러리를 AWS 계정의 Amazon S3 버킷에 업로드합니다.

  3. 애플리케이션 구성에서 EMR Serverless 작업을 제출하는 경우 현재 버킷에 있는 Delta Lake JAR 파일을 포함합니다.

    --conf spark.jars=s3://amzn-s3-demo-bucket/jars/delta-core_2.12-1.1.0.jar
  4. Delta 테이블에서 읽고 쓸 수 있는지 확인하려면 샘플 PySpark 테스트를 실행합니다.

    from pyspark import SparkConf, SparkContext from pyspark.sql import HiveContext, SparkSession import uuid conf = SparkConf() sc = SparkContext(conf=conf) sqlContext = HiveContext(sc) url = "s3://amzn-s3-demo-bucket/delta-lake/output/1.0.1/%s/" % str(uuid.uuid4()) ## creates a Delta table and outputs to target S3 bucket session.range(5).write.format("delta").save(url) ## reads a Delta table and outputs to target S3 bucket session.read.format("delta").load(url).show