기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
Amazon EMR Serverless를 사용하여 DynamoDB에 연결
이 자습서에서는 미국 지명위원회
1단계: Amazon S3 버킷에 데이터 업로드
Amazon S3 버킷을 생성하려면 Amazon Simple Storage Service 콘솔 사용 설명서의 버킷 생성에 나온 지침을 따르세요.
에 대한 참조를 새로 생성된 버킷의 이름으로 바꿉니다. 이제 EMR Serverless 애플리케이션에서 작업을 실행할 준비가 되었습니다.amzn-s3-demo-bucket
-
다음 명령을 사용하여 샘플 데이터 아카이브(
features.zip
)를 다운로드합니다.wget https://docs.aws.amazon.com/amazondynamodb/latest/developerguide/samples/features.zip
-
아카이브에서
features.txt
파일을 추출하고 파일의 처음 몇 줄을 봅니다.unzip features.zip head features.txt
결과는 다음과 비슷합니다.
1535908|Big Run|Stream|WV|38.6370428|-80.8595469|794 875609|Constable Hook|Cape|NJ|40.657881|-74.0990309|7 1217998|Gooseberry Island|Island|RI|41.4534361|-71.3253284|10 26603|Boone Moore Spring|Spring|AZ|34.0895692|-111.410065|3681 1506738|Missouri Flat|Flat|WA|46.7634987|-117.0346113|2605 1181348|Minnow Run|Stream|PA|40.0820178|-79.3800349|1558 1288759|Hunting Creek|Stream|TN|36.343969|-83.8029682|1024 533060|Big Charles Bayou|Bay|LA|29.6046517|-91.9828654|0 829689|Greenwood Creek|Stream|NE|41.596086|-103.0499296|3671 541692|Button Willow Island|Island|LA|31.9579389|-93.0648847|98
여기에 나온 각 줄의 필드는 고유 식별자, 이름, 자연 특성 유형, 주, 위도, 경도, 고도(피트)를 나타냅니다.
-
Amazon S3에 데이터 업로드
aws s3 cp features.txt s3://
amzn-s3-demo-bucket
/features/
2단계: Hive 테이블 생성
Apache Spark 또는 Hive를 사용하여 Amazon S3에 업로드된 데이터가 포함된 새 Hive 테이블을 생성합니다.
Spark에서 Hive 테이블을 생성하려면 다음 명령을 실행합니다.
import org.apache.spark.sql.SparkSession val sparkSession = SparkSession.builder().enableHiveSupport().getOrCreate() sparkSession.sql("CREATE TABLE
hive_features
\ (feature_id BIGINT, \ feature_name STRING, \ feature_class STRING, \ state_alpha STRING, \ prim_lat_dec DOUBLE, \ prim_long_dec DOUBLE, \ elev_in_ft BIGINT) \ ROW FORMAT DELIMITED \ FIELDS TERMINATED BY '|' \ LINES TERMINATED BY '\n' \ LOCATION 's3://amzn-s3-demo-bucket
/features';")
이제 features.txt
파일의 데이터로 고유 Hive 테이블을 채워졌습니다. 데이터가 테이블에 있는지 확인하려면 다음 예제와 같이 Spark SQL 쿼리를 실행합니다.
sparkSession.sql( "SELECT state_alpha, COUNT(*) FROM
hive_features
GROUP BY state_alpha;")
3단계: DynamoDB로 데이터 복사
Spark 또는 Hive를 사용하여 데이터를 새 DynamoDB 테이블에 복사합니다.
이전 단계에서 생성한 Hive 테이블의 데이터를 DynamoDB로 복사하려면 DynamoDB로 데이터 복사의 1~3단계를 수행합니다. 그러면 새 DynamoDB 테이블(Features
)이 생성됩니다. 그리고 다음 예제와 같이 텍스트 파일에서 직접 데이터를 읽고 DynamoDB 테이블에 복사할 수 있습니다.
import com.amazonaws.services.dynamodbv2.model.AttributeValue import org.apache.hadoop.dynamodb.DynamoDBItemWritable import org.apache.hadoop.dynamodb.read.DynamoDBInputFormat import org.apache.hadoop.io.Text import org.apache.hadoop.mapred.JobConf import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object EmrServerlessDynamoDbTest { def main(args: Array[String]): Unit = { jobConf.set("dynamodb.input.tableName", "Features") jobConf.set("dynamodb.output.tableName", "Features") jobConf.set("dynamodb.region", "
region
") jobConf.set("mapred.output.format.class", "org.apache.hadoop.dynamodb.write.DynamoDBOutputFormat") jobConf.set("mapred.input.format.class", "org.apache.hadoop.dynamodb.read.DynamoDBInputFormat") val rdd = sc.textFile("s3://amzn-s3-demo-bucket
/ddb-connector/") .map(row => { val line = row.split("\\|") val item = new DynamoDBItemWritable() val elevInFt = if (line.length > 6) { new AttributeValue().withN(line(6)) } else { new AttributeValue().withNULL(true) } item.setItem(Map( "feature_id" -> new AttributeValue().withN(line(0)), "feature_name" -> new AttributeValue(line(1)), "feature_class" -> new AttributeValue(line(2)), "state_alpha" -> new AttributeValue(line(3)), "prim_lat_dec" -> new AttributeValue().withN(line(4)), "prim_long_dec" -> new AttributeValue().withN(line(5)), "elev_in_ft" -> elevInFt) .asJava) (new Text(""), item) }) rdd.saveAsHadoopDataset(jobConf) } }
4단계: DynamoDB에서 데이터 쿼리
Spark 또는 Hive를 사용하여 DynamoDB 테이블을 쿼리합니다.
이전 단계에서 생성한 DynamoDB 테이블에서 데이터를 쿼리하기 위해 Spark SQL 또는 Spark MapReduce API를 사용할 수 있습니다.
예 - Spark SQL을 사용하여 DynamoDB 테이블 쿼리
다음 Spark SQL 쿼리는 모든 기능 유형의 목록을 사전순으로 반환합니다.
val dataFrame = sparkSession.sql("SELECT DISTINCT feature_class \
FROM ddb_features \
ORDER BY feature_class;")
다음 Spark SQL 쿼리는 문자 M으로 시작하는 모든 레이크의 목록을 반환합니다.
val dataFrame = sparkSession.sql("SELECT feature_name, state_alpha \
FROM ddb_features \
WHERE feature_class = 'Lake' \
AND feature_name LIKE 'M%' \
ORDER BY feature_name;")
다음 Spark SQL 쿼리는 1마일보다 큰 세 개 이상의 기능이 있는 모든 상태 목록을 반환합니다.
val dataFrame = sparkSession.dql("SELECT state_alpha, feature_class, COUNT(*) \
FROM ddb_features \
WHERE elev_in_ft > 5280 \
GROUP by state_alpha, feature_class \
HAVING COUNT(*) >= 3 \
ORDER BY state_alpha, feature_class;")
예 - Spark MapReduce API를 사용하여 DynamoDB 테이블 쿼리
다음 MapReduce 쿼리는 모든 기능 유형의 목록을 사전순으로 반환합니다.
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
.map(pair => (pair._1, pair._2.getItem))
.map(pair => pair._2.get("feature_class").getS)
.distinct()
.sortBy(value => value)
.toDF("feature_class")
다음 MapReduce 쿼리는 문자 M으로 시작하는 모든 레이크의 목록을 반환합니다.
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
.map(pair => (pair._1, pair._2.getItem))
.filter(pair => "Lake".equals(pair._2.get("feature_class").getS))
.filter(pair => pair._2.get("feature_name").getS.startsWith("M"))
.map(pair => (pair._2.get("feature_name").getS, pair._2.get("state_alpha").getS))
.sortBy(_._1)
.toDF("feature_name", "state_alpha")
다음 MapReduce 쿼리는 1마일보다 큰 세 개 이상의 기능이 있는 모든 상태 목록을 반환합니다.
val df = sc.hadoopRDD(jobConf, classOf[DynamoDBInputFormat], classOf[Text], classOf[DynamoDBItemWritable])
.map(pair => pair._2.getItem)
.filter(pair => pair.get("elev_in_ft").getN != null)
.filter(pair => Integer.parseInt(pair.get("elev_in_ft").getN) > 5280)
.groupBy(pair => (pair.get("state_alpha").getS, pair.get("feature_class").getS))
.filter(pair => pair._2.size >= 3)
.map(pair => (pair._1._1, pair._1._2, pair._2.size))
.sortBy(pair => (pair._1, pair._2))
.toDF("state_alpha", "feature_class", "count")