쿠키 기본 설정 선택

당사는 사이트와 서비스를 제공하는 데 필요한 필수 쿠키 및 유사한 도구를 사용합니다. 고객이 사이트를 어떻게 사용하는지 파악하고 개선할 수 있도록 성능 쿠키를 사용해 익명의 통계를 수집합니다. 필수 쿠키는 비활성화할 수 없지만 '사용자 지정' 또는 ‘거부’를 클릭하여 성능 쿠키를 거부할 수 있습니다.

사용자가 동의하는 경우 AWS와 승인된 제3자도 쿠키를 사용하여 유용한 사이트 기능을 제공하고, 사용자의 기본 설정을 기억하고, 관련 광고를 비롯한 관련 콘텐츠를 표시합니다. 필수가 아닌 모든 쿠키를 수락하거나 거부하려면 ‘수락’ 또는 ‘거부’를 클릭하세요. 더 자세한 내용을 선택하려면 ‘사용자 정의’를 클릭하세요.

Amazon EMR Serverless를 사용하여 DynamoDB에 연결 - Amazon EMR

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

Amazon EMR Serverless를 사용하여 DynamoDB에 연결

이 자습서에서는 미국 지명위원회에서 Amazon S3 버킷에 데이터 하위 세트를 업로드한 다음, Amazon EMR Serverless에서 Hive 또는 Spark를 사용하여 쿼리할 수 있는 Amazon DynamoDB 테이블로 데이터를 복사합니다.

1단계: Amazon S3 버킷에 데이터 업로드

Amazon S3 버킷을 생성하려면 Amazon Simple Storage Service 콘솔 사용 설명서의 버킷 생성에 나온 지침을 따르세요. amzn-s3-demo-bucket에 대한 참조를 새로 생성된 버킷의 이름으로 바꿉니다. 이제 EMR Serverless 애플리케이션에서 작업을 실행할 준비가 되었습니다.

  1. 다음 명령을 사용하여 샘플 데이터 아카이브(features.zip)를 다운로드합니다.

    wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
  2. 아카이브에서 features.txt 파일을 추출하고 파일의 처음 몇 줄을 봅니다.

    unzip features.zip head features.txt

    결과는 다음과 비슷합니다.

    1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98

    여기에 나온 각 줄의 필드는 고유 식별자, 이름, 자연 특성 유형, 주, 위도, 경도, 고도(피트)를 나타냅니다.

  3. Amazon S3에 데이터 업로드

    aws s3 cp features.txt s3://amzn-s3-demo-bucket/features/

2단계: Hive 테이블 생성

Apache Spark 또는 Hive를 사용하여 Amazon S3에 업로드된 데이터가 포함된 새 Hive 테이블을 생성합니다.

Spark

Spark에서 Hive 테이블을 생성하려면 다음 명령을 실행합니다.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

이제 features.txt 파일의 데이터로 고유 Hive 테이블을 채워졌습니다. 데이터가 테이블에 있는지 확인하려면 다음 예제와 같이 Spark SQL 쿼리를 실행합니다.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")
Hive

Hive에서 Hive 테이블을 생성하려면 다음 명령을 실행합니다.

CREATE TABLE hive_features (feature_id BIGINT, feature_name STRING , feature_class STRING , state_alpha STRING, prim_lat_dec DOUBLE , prim_long_dec DOUBLE , elev_in_ft BIGINT) ROW FORMAT DELIMITED FIELDS TERMINATED BY '|' LINES TERMINATED BY '\n' LOCATION 's3://amzn-s3-demo-bucket/features';

이제 features.txt 파일의 데이터를 포함하는 Hive 테이블이 있습니다. 데이터가 테이블에 있는지 확인하려면 다음 예제와 같이 HiveQL 쿼리를 실행합니다.

SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;

Spark에서 Hive 테이블을 생성하려면 다음 명령을 실행합니다.

import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE hive_features \ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket/features';")

이제 features.txt 파일의 데이터로 고유 Hive 테이블을 채워졌습니다. 데이터가 테이블에 있는지 확인하려면 다음 예제와 같이 Spark SQL 쿼리를 실행합니다.

sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM hive_features GROUP BY state_alpha;")

3단계: DynamoDB로 데이터 복사

Spark 또는 Hive를 사용하여 데이터를 새 DynamoDB 테이블에 복사합니다.

Spark

이전 단계에서 생성한 Hive 테이블의 데이터를 DynamoDB로 복사하려면 DynamoDB로 데이터 복사1~3단계를 수행합니다. 그러면 새 DynamoDB 테이블(Features)이 생성됩니다. 그리고 다음 예제와 같이 텍스트 파일에서 직접 데이터를 읽고 DynamoDB 테이블에 복사할 수 있습니다.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
Hive

이전 단계에서 생성한 Hive 테이블의 데이터를 DynamoDB로 복사하려면 DynamoDB로 데이터 복사의 지침을 따릅니다.

이전 단계에서 생성한 Hive 테이블의 데이터를 DynamoDB로 복사하려면 DynamoDB로 데이터 복사1~3단계를 수행합니다. 그러면 새 DynamoDB 테이블(Features)이 생성됩니다. 그리고 다음 예제와 같이 텍스트 파일에서 직접 데이터를 읽고 DynamoDB 테이블에 복사할 수 있습니다.

import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "region") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }

4단계: DynamoDB에서 데이터 쿼리

Spark 또는 Hive를 사용하여 DynamoDB 테이블을 쿼리합니다.

Spark

이전 단계에서 생성한 DynamoDB 테이블에서 데이터를 쿼리하기 위해 Spark SQL 또는 Spark MapReduce API를 사용할 수 있습니다.

예 - Spark SQL을 사용하여 DynamoDB 테이블 쿼리

다음 Spark SQL 쿼리는 모든 기능 유형의 목록을 사전순으로 반환합니다.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

다음 Spark SQL 쿼리는 문자 M으로 시작하는 모든 레이크의 목록을 반환합니다.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

다음 Spark SQL 쿼리는 1마일보다 큰 세 개 이상의 기능이 있는 모든 상태 목록을 반환합니다.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
예 - Spark MapReduce API를 사용하여 DynamoDB 테이블 쿼리

다음 MapReduce 쿼리는 모든 기능 유형의 목록을 사전순으로 반환합니다.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

다음 MapReduce 쿼리는 문자 M으로 시작하는 모든 레이크의 목록을 반환합니다.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

다음 MapReduce 쿼리는 1마일보다 큰 세 개 이상의 기능이 있는 모든 상태 목록을 반환합니다.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
Hive

이전 단계에서 생성한 DynamoDB 테이블에서 데이터를 쿼리하려면 DynamoDB 테이블에서 데이터 쿼리의 지침을 따릅니다.

이전 단계에서 생성한 DynamoDB 테이블에서 데이터를 쿼리하기 위해 Spark SQL 또는 Spark MapReduce API를 사용할 수 있습니다.

예 - Spark SQL을 사용하여 DynamoDB 테이블 쿼리

다음 Spark SQL 쿼리는 모든 기능 유형의 목록을 사전순으로 반환합니다.

val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \ FROM ddb_features \ ORDER BY feature_class;")

다음 Spark SQL 쿼리는 문자 M으로 시작하는 모든 레이크의 목록을 반환합니다.

val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \ FROM ddb_features \ WHERE feature_class = 'Lake' \ AND feature_name LIKE 'M%' \ ORDER BY feature_name;")

다음 Spark SQL 쿼리는 1마일보다 큰 세 개 이상의 기능이 있는 모든 상태 목록을 반환합니다.

val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \ FROM ddb_features \ WHERE elev_in_ft > 5280 \ GROUP by state_alpha, feature_class \ HAVING COUNT(*) >= 3 \ ORDER BY state_alpha, feature_class;")
예 - Spark MapReduce API를 사용하여 DynamoDB 테이블 쿼리

다음 MapReduce 쿼리는 모든 기능 유형의 목록을 사전순으로 반환합니다.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .map(pair => pair._2.get("feature_class").getS) .distinct() .sortBy(value => value) .toDF("feature_class")

다음 MapReduce 쿼리는 문자 M으로 시작하는 모든 레이크의 목록을 반환합니다.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => (pair._1, pair._2.getItem)) .filter(pair => "Lake".equals(pair._2.get("feature_class").getS)) .filter(pair => pair._2.get("feature_name").getS.startsWith("M")) .map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS)) .sortBy(_._1) .toDF("feature_name", "state_alpha")

다음 MapReduce 쿼리는 1마일보다 큰 세 개 이상의 기능이 있는 모든 상태 목록을 반환합니다.

val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable]) .map(pair => pair._2.getItem) .filter(pair => pair.get("elev_in_ft").getN != null) .filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280) .groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS)) .filter(pair => pair._2.size >= 3) .map(pair => (pair._1._1, pair._1._2, pair._2.size)) .sortBy(pair => (pair._1, pair._2)) .toDF("state_alpha", "feature_class", "count")
프라이버시사이트 이용 약관쿠키 기본 설정
© 2025, Amazon Web Services, Inc. 또는 계열사. All rights reserved.