AWS Glue Scala DynamicFrame 클래스 - AWS Glue

기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.

AWS Glue Scala DynamicFrame 클래스

패키지:   com.amazonaws.services.glue

class DynamicFrame extends Serializable with Logging ( val glueContext : GlueContext, _records : RDD[DynamicRecord], val name : String = s"", val transformationContext : String = DynamicFrame.UNDEFINED, callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0, prevErrors : => Long = 0, errorExpr : => Unit = {} )

DynamicFrame은 자기 기술형 DynamicRecord 객체의 분산형 모음입니다.

DynamicFrame은 ETL(extract, transform, load) 작업에 유연한 데이터 모델을 제공할 수 있게끔 설계되어 있습니다. 스키마를 생성할 필요가 없고, 복잡하거나 일관되지 않은 값과 유형을 가진 데이터를 읽고 변환할 때 사용할 수 있습니다. 스키마가 필요한 작업에만 '온 디맨드(필요에 따라)'로 스키마를 계산할 수 있습니다.

DynamicFrame은 데이터 정리 및 ETL에 필요한 다양한 변환을 지원합니다. 또한 DataFrame이 제공하는 많은 분석 작업과 기존 코드를 통합할 수 있도록 SparkSQL DataFrame 변환을 지원합니다.

다음 파라미터는 DynamicFrame을 구성하는 수많은 AWS Glue 변환에서 공유됩니다.

  • transformationContext - 이 DynamicFrame의 식별자입니다. transformationContext는 실행 간 지속되는 작업 북마크 상태에 대한 키로 사용됩니다.

  • callSite - 오류 보고에 필요한 컨텍스트 정보를 제공합니다. Python에서 호출 할 때 자동으로 이 값이 설정됩니다.

  • stageThreshold - 예외가 발생하기 전에 이 DynamicFrame 계산에 허용되는 오류 레코드의 최대 개수입니다(이전 DynamicFrame에 존재하는 레코드 제외).

  • totalThreshold - 예외가 발생하기 전 전체 오류 레코드의 최대 개수입니다(이전 프레임의 레코드 포함).

val errorsCount

val errorsCount

DynamicFrame의 오류 레코드 개수입니다. 여기에는 이전 작업의 오류가 포함됩니다.

def applyMapping

def applyMapping( mappings : Seq[Product4[String, String, String, String]], caseSensitive : Boolean = true, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • mappings - 새 DynamicFrame을 구성하는 매핑 시퀀스입니다.

  • caseSensitive - 소스 열을 대소문자를 구분해서 처리할지 여부입니다. 이것을 false로 설정하면 AWS Glue Data Catalog 같이 대/소문자를 구분하지 않는 스토어 통합에 도움이 됩니다.

매핑 시퀀스에 따라 열을 선택, 프로젝트, 캐스트 합니다.

각 매핑은 소스 열과 유형 및 대상 열과 유형으로 구성되어 있습니다. 매핑을 4개 튜플(source_path, source_type, target_path, target_type) 또는 동일한 정보가 있는 MappingSpec 객체로 지정할 수 있습니다.

간단한 프로젝션과 캐스팅에 매핑을 사용하는 것 외에 '.'(마침표)가 있는 경로의 구성 요소를 분리해 필드를 중첩하거나 중첩을 해제할 때도 매핑을 사용할 수 있습니다.

예를 들어 DynamicFrame에 다음 스키마가 있다고 가정해보겠습니다.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- zip: int }}}

다음 호출을 통해 statezip 필드의 중첩을 해제할 수 있습니다.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("address.state", "string", "state", "string"), ("address.zip", "int", "zip", "int"))) }}}

결과 스키마는 다음과 같습니다.

{{{ root |-- name: string |-- age: int |-- state: string |-- zip: int }}}

또한 applyMapping을 사용하여 열을 다시 중첩시킬 수 있습니다. 예를 들어 다음은 이전 변환을 반전시킨 다음에 대상에 address라는 이름의 구조체를 생성합니다.

{{{ df.applyMapping( Seq(("name", "string", "name", "string"), ("age", "int", "age", "int"), ("state", "string", "address.state", "string"), ("zip", "int", "address.zip", "int"))) }}}

'.'(마침표) 문자가 있는 필드 이름은 백틱(``)을 인용 부호로 사용할 수 있습니다.

참고

현재는 applyMapping 방법을 사용하여 어레이 아래에 중첩된 열을 매핑할 수 없습니다.

def assertErrorThreshold

def assertErrorThreshold : Unit

계산을 강제 실행하고 오류 레코드의 개수가 stageThresholdtotalThreshold 미만인지 확인하는 작업입니다. 두 조건 중 하나가 충족되지 않는 경우 예외가 발생합니다.

def count

lazy def count

DynamicFrame의 요소 개수를 반환합니다.

def dropField

def dropField( path : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

지정한 열이 제거된 새 DynamicFrame을 반환합니다.

def dropFields

def dropFields( fieldNames : Seq[String], // The column names to drop. transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

지정한 열이 제거된 새 DynamicFrame을 반환합니다.

이 방법을 사용하여 어레이 내부의 열을 포함해 중첩된 열을 삭제할 수 있지만, 지정 어레이 요소를 없앨 수는 없습니다.

def dropNulls

def dropNulls( transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 )

null 열이 모두 제거된 새 DynamicFrame을 반환합니다.

참고

NullType 유형의 열만 제거합니다. 다른 열의 개별 null 값은 제거되거나 수정되지 않습니다.

def errorsAsDynamicFrame

def errorsAsDynamicFrame

DynamicFrame의 오류 레코드를 포함하는 새 DynamicFrame을 반환합니다.

def filter

def filter( f : DynamicRecord => Boolean, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

함수 'f'이 true를 반환하는 레코드만 포함하는 새로운 DynamicFrame을 생성합니다. 필터 함수 'f'는 입력 레코드를 바꾸지 않습니다.

def getName

def getName : String

DynamicFrame의 이름을 반환합니다.

def getNumPartitions

def getNumPartitions

DynamicFrame의 파티션 개수를 반환합니다.

def getSchemaIfComputed

def getSchemaIfComputed : Option[Schema]

계산이 완료된 스키마를 반환합니다. 스키마 계산이 완료되지 않은 경우 데이터를 스캔하지 않습니다.

def isSchemaComputed

def isSchemaComputed : Boolean

DynamicFrame에 대해 스키마가 계산되었으면 true를 반환하고 그렇지 않으면 false를 반환합니다. 이 방법이 false를 반환할 경우 schema 방법을 호출하려면 이 DynamicFrame의 레코드를 한 번 더 전달해야 합니다.

def javaToPython

def javaToPython : JavaRDD[Array[Byte]]

def join

def join( keys1 : Seq[String], keys2 : Seq[String], frame2 : DynamicFrame, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • keys1 - 이 DynamicFrame에서 조인에 사용할 수 있는 열입니다.

  • keys2 - frame2에서 조인에 사용할 수 있는 열입니다. keys1과 길이가 같아야 합니다.

  • frame2 - DynamicFrame 조인 기준입니다.

지정된 키를 사용하여 frame2로 동등 조인을 수행한 결과를 반환합니다.

def map

def map( f : DynamicRecord => DynamicRecord, errorMsg : String = "", transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

DynamicFrame의 각 레코드에 지정된 함수 'f'를 적용하여 생성된 새로운 DynamicFrame을 반환합니다.

이 방법은 레코드가 바뀌지 않도록 각 레코드를 복사한 후에 지정된 함수를 적용합니다. 매핑 함수가 특정 레코드에 대해 예외를 발생시킨 경우, 이 레코드를 오류로 표시하고 스택 트레이스를 오류 레코드의 열로 저장합니다.

def mergeDynamicFrames

def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "", options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"), stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
  • stageDynamicFrame - 병합할 스테이징 DynamicFrame입니다.

  • primaryKeys - 소스 및 스테이징 DynamicFrame의 레코드와 일치시킬 기본 키 필드 목록입니다.

  • transformationContext - 현재 변환에 대한 메타데이터를 검색하는 데 사용되는 고유한 문자열입니다(선택 사항).

  • options - 이 변환에 대한 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다.

  • callSite - 오류 보고에 필요한 컨텍스트 정보를 제공하는 데 사용합니다.

  • stageThreshold - Long입니다. 오류가 발생되는 지정된 변환의 오류 수입니다.

  • totalThreshold - Long입니다. 오류가 발생되는 이 변환의 총 오류 수입니다.

레코드를 식별하기 위해, 지정된 기본 키를 기반으로 이 DynamicFrame을 스테이징 DynamicFrame과 병합합니다. 중복 레코드(기본 키가 동일한 레코드)는 중복 제거되지 않습니다. 스테이징 프레임에 일치하는 레코드가 없으면 모든 레코드(중복 레코드 포함)가 소스에서 유지됩니다. 스테이징 프레임에 일치하는 레코드가 있으면 스테이징 프레임의 레코드가 AWS Glue의 소스에 있는 레코드를 덮어씁니다.

다음과 같은 경우 반환된 DynamicFrame에 레코드 A가 포함되어 있습니다.

  1. A가 소스 프레임과 스테이징 프레임 모두에 있는 경우에는 스테이징 프레임의 A가 반환됩니다.

  2. A가 소스 테이블에 있고 A.primaryKeysstagingDynamicFrame에 없는 경우에는 A는 스테이징 테이블에서 업데이트되지 않습니다.

소스 프레임과 스테이징 프레임이 동일한 스키마를 가질 필요는 없습니다.

val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))

def printSchema

def printSchema : Unit

DynamicFrame의 스키마를 사람이 읽을 수 있는 형식의 stdout으로 인쇄합니다.

def recomputeSchema

def recomputeSchema : Schema

스키마 재계산을 강제 실행합니다. 이를 위해 데이터를 스캔해야 하지만, 현재 스키마의 일부 필드가 데이터에 존재하지 않는 경우에는 스키마를 줄여야 할 수도 있습니다.

재계산한 스키마를 반환합니다.

def relationalize

def relationalize( rootTableName : String, stagingPath : String, options : JsonOptions = JsonOptions.empty, transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • rootTableName - 출력에서 기본 DynamicFrame에 사용하는 이름. 이를 접두사로 시작하는 어레이를 피벗하여 생성되는 DynamicFrame입니다.

  • stagingPath - 중간 데이터 쓰기를 위한 Amazon Simple Storage Service(Amazon S3) 경로입니다.

  • options - 관계화 옵션 및 구성입니다. 현재 사용되지 않습니다.

모든 중첩된 구조와 피벗 어레이를 별개 테이블로 평면화합니다.

이 작업을 사용하여 크게 중첩된 데이터를 관계형 데이터베이스가 처리하도록 준비할 수 있습니다. 중첩된 구조를 unnest 변환과 동일한 방식으로 평면화합니다. 여기에 더해, 어레이를 각 어레이 요소가 행이 되는 별개 테이블로 피벗합니다. 예를 들어 DynamicFrame에 다음 데이터가 있다고 가정해보겠습니다.

{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]} {"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]} {"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}

다음 코드를 실행합니다.

{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}

그러면 두 개 테이블이 생성됩니다. 첫 번째 테이블 이름은 "people"이며, 다음이 포함되어 있습니다.

{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}

여기에서는 친구 어레이가 자동 생성된 조인 키로 교체되어 있습니다. 다음 내용이 포함된 people.friends라는 이름의 별개 테이블이 생성됩니다.

{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}

이 테이블에서 'id'는 어레이 요소의 출처인 레코드를 식별하는 조인 키이며, 'index'는 원본 어레이에서 그 위치를 나타내고, 'val'는 실제 어레이 항목입니다.

relationalize 방법은 이 프로세스를 모든 어레이에 반복 적용하여 생성되는 DynamicFrame의 시퀀스를 반환합니다.

참고

AWS Glue 라이브러리는 자동으로 새 테이블에 대한 조인 키를 생성합니다. 작업 실행 간 조인 키가 고유하도록 만들려면 작업 북마크를 활성화해야 합니다.

def renameField

def renameField( oldName : String, newName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • oldName - 열의 원래 이름입니다.

  • newName - 열의 새 이름입니다.

이름이 바뀐 지정 필드로 새 DynamicFrame을 반환합니다.

이 방법을 사용하여 중첩된 필드의 이름을 바꿀 수 있습니다. 예를 들어 주소 구조체의 state라는 이름을 state_code로 바꿉니다.

{{{ df.renameField("address.state", "address.state_code") }}}

def repartition

def repartition( numPartitions : Int, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

numPartitions 파티션이 있는 새 DynamicFrame을 반환합니다.

def resolveChoice

def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec], choiceOption : Option[ChoiceOption] = None, database : Option[String] = None, tableName : Option[String] = None, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • choiceOption - 사양 시퀀스에 열거되지 않는 모든 ChoiceType 열에 적용되는 작업입니다.

  • database - match_catalog 작업에 사용할 Data Catalog 데이터베이스입니다.

  • tableName - match_catalog 작업에 사용할 Data Catalog 테이블입니다.

하나 이상의 ChoiceType을 보다 구체적인 유형으로 대체하여 새로운 DynamicFrame을 반환합니다.

resolveChoice를 사용하는 방법은 두 가지입니다. 첫 번째는 지정 열의 시퀀스와 확인 방법을 지정하는 방법입니다. (열과 작업)쌍으로 구성된 튜플로 지정합니다.

가능한 작업은 다음과 같습니다.

  • cast:type - 모든 값을 지정된 유형으로 캐스트하는 시도를 합니다.

  • make_cols - 각 유형을 columnName_type이라는 이름을 가진 열로 변환합니다.

  • make_struct - 열을 각 유형에 대한 키가 있는 구조체로 변환합니다.

  • project:type - 지정된 유형의 값만 유지합니다.

resolveChoice의 또 다른 모드는 모든 ChoiceType에 대해 단 하나의 해결책을 지정하는 것입니다. 실행 전에 ChoiceType의 완전한 목록을 모르는 경우에 사용할 수 있습니다. 이 모드는 위에서 나열한 작업 외에 다음 작업을 지원합니다.

  • match_catalog - 각 ChoiceType을 지정된 카탈로그 테이블의 해당 유형에 캐스팅해봅니다.

:

int로 캐스팅을 해서 user.id 열을 확인하고, address 필드에는 구조체만 유지합니다.

{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}

각 선택을 개별 열로 변환하여 전체 ChoiceType을 확인합니다.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}

지정된 카탈로그 테이블의 유형으로 캐스팅하여 전체 ChoiceType을 확인합니다.

{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}

def schema

def schema : Schema

DynamicFrame의 스키마를 반환합니다.

반환되는 스키마에는 이 DynamicFrame의 레코드에 있는 모든 필드가 있습니다. 단, 많지는 않지만 추가 필드가 포함되는 경우도 있을 수 있습니다. unnest 방법을 사용하여 이 DynamicFrame의 레코드에 따라 스키마를 "줄일" 수 있습니다.

def selectField

def selectField( fieldName : String, transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

단일 필드를 DynamicFrame으로 반환합니다.

def selectFields

def selectFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • paths - 선택할 열 이름의 시퀀스입니다.

지정한 열이 포함된 새 DynamicFrame을 반환합니다.

참고

최상위 열에만 selectFields 방법을 사용할 수 있습니다. applyMapping 방법을 사용하여 중첩된 열을 선택할 수 있습니다.

def show

def show( numRows : Int = 20 ) : Unit
  • numRows - 인쇄할 행의 개수입니다.

DynamicFrame의 행을 JSON 형식으로 인쇄합니다.

Def simplifyDDBJson

AWS Glue DynamoDB 내보내기 커넥터를 사용하여 DynamoDB 내보내기를 수행하면 특정 중첩 구조의 JSON 파일이 생성됩니다. 자세한 내용은 데이터 객체를 참조하세요. simplifyDDBJson 이러한 유형의 데이터에 대한 DynamicFrame의 중첩 열을 단순화하고 단순화된 새로운 DynamicFrame을 반환합니다. 목록 유형에 여러 유형이 있거나 맵 유형이 포함된 경우 목록의 요소는 단순화되지 않습니다. 이 메서드는 DynamoDB 내보내기 JSON 형식의 데이터만 지원합니다. 다른 종류의 데이터에 대해 유사한 변경을 수행하려면 unnest를 고려하세요.

def simplifyDDBJson() : DynamicFrame

이 메서드에서는 파라미터를 사용하지 않습니다.

입력 예

DynamoDB 내보내기로 생성된 다음 스키마를 고려해 보세요.

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

예제 코드

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "ddbTableARN", "dynamodb.s3.bucket" -> "exportBucketLocation", "dynamodb.s3.prefix" -> "exportBucketPrefix", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }

simplifyDDBJson 변환은 이를 다음과 같이 간소화합니다.

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null

def spigot

def spigot( path : String, options : JsonOptions = new JsonOptions("{}"), transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

동일한 레코드를 반환하지만 부수적으로 레코드의 하위 집합을 작성하는 패스스루 변환입니다.

  • path - s3://bucket//path 형식으로 출력을 쓸 Amazon S3의 경로입니다.

  • options - 샘플링 동작을 설명하는 선택 사항인 JsonOptions 맵입니다.

이것과 동일한 레코드가 포함된 DynamicFrame을 반환합니다.

path로 지정한 위치에 임의의 레코드 100개를 작성하는 것이 기본값입니다. options 맵을 사용하여 이 동작을 사용자 지정할 수 있습니다. 유효한 키에는 다음이 포함되어 있습니다.

  • topk - 작성할 레코드의 총 개수를 지정합니다. 기본값은 100입니다.

  • prob - 개별 레코드의 포함 확률(십진수)을 지정합니다. 기본값은 1.

예를 들어 다음 호출은 20퍼센트의 확률로 각 레코드를 선택하고 200개의 레코드를 작성한 후에 중단하는 방법으로 데이터 세트를 샘플링합니다.

{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}

def splitFields

def splitFields( paths : Seq[String], transformationContext : String = "", callSite : CallSite = CallSite("Not provided", ""), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : Seq[DynamicFrame]
  • paths - 첫 DynamicFrame에 포함될 경로입니다.

두 개 DynamicFrame의 시퀀스를 반환합니다. 첫 번째 DynamicFrame에는 지정 경로가, 두 번째에는 나머지 열 전부가 포함됩니다.

이 예제에서는 AWS Glue Data Catalog의 legislators 데이터베이스에 있는 persons 테이블에서 생성된 DynamicFrame을 사용하고 두 개의 DynamicFrame으로 분할합니다. 지정된 필드는 첫 번째 DynamicFrame으로 이동하고, 나머지 필드는 두 번째 DynamicFrame으로 이동합니다. 그런 다음, 예제는 결과에서 첫 번째 DynamicFrame을 선택합니다.

val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)

def splitRows

def splitRows( paths : Seq[String], values : Seq[Any], operators : Seq[String], transformationContext : String, callSite : CallSite, stageThreshold : Long, totalThreshold : Long ) : Seq[DynamicFrame]

열을 상수에 비교하는 조건자에 따라 행을 분할합니다.

  • paths - 비교에 사용할 열입니다.

  • values - 비교에 사용할 상수 값입니다.

  • operators - 비교에 사용할 연산자입니다.

두 개 DynamicFrame의 시퀀스를 반환합니다. 첫 번째에는 조건자가 true, 두 번째에는 조건자가 false인 행이 포함됩니다.

조건자는 다음 세 개의 시퀀스를 사용하여 지정됩니다. 'paths'에는 (중첩될 수 있는) 열 이름이 포함되고, 'values'에는 비교할 상수 값이 포함되며, 'operators'에는 비교에 사용할 연산자가 포함됩니다. 세 가지 시퀀스는 모두 길이가 같아야 합니다. n번째 연산자를 사용하여 n번째 열과 n번째 값을 비교합니다.

각 연산자는 "!=", "=", "<=", "<", ">=" 또는 ">" 중 하나여야 합니다.

예를 들어 다음 호출은 첫 번째 출력 프레임에는 65세 이상의 미국인에 대한 레코드가 포함되고 두 번째에는 나머지 레코드 전부가 포함되도록 DynamicFrame을 분할합니다.

{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq("&gt;=", "=")) }}}

def stageErrorsCount

def stageErrorsCount

DynamicFrame을 계산하는 동안 생성된 오류 레코드의 개수를 반환합니다. 이 DynamicFrame에 입력으로 전달된 이전 작업의 오류는 제외됩니다.

def toDF

def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame

DynamicFrame을 스키마와 레코드가 동일한 Apache Spark SQL DataFrame으로 변환합니다.

참고

DataFrameChoiceType을 지원하지 않으므로 이 방법은 ChoiceType 열을 StructType로 자동 변환합니다. 선택할 수 있는 확인(해결) 방법에 대한 자세한 내용은 resolveChoice 단원을 참조하십시오.

def unbox

def unbox( path : String, format : String, optionString : String = "{}", transformationContext : String = "", callSite : CallSite = CallSite("Not provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame
  • path - 구문 분석할 열입니다. 문자열 또는 이진수여야 합니다.

  • format - 구문 분석에 사용할 포맷입니다.

  • optionString - CSV 구분자 같은 포맷으로 전달되는 옵션입니다.

지정 형식에 따라 포함된 문자열이나 이진수 열을 구문 분석합니다. 구문 분석한 열은 원래 열 이름을 가진 구조체 아래 중첩됩니다.

예를 들어 JSON 열이 포함된 CSV 파일이 있다고 가정해보겠습니다.

name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...

최초 구문 분석 후에 다음 스키마를 갖는 DynamicFrame이 생깁니다.

{{{ root |-- name: string |-- age: int |-- address: string }}}

주소 열에 unbox를 호출해 지정 구성 요소를 구문 분석할 수 있습니다.

{{{ df.unbox("address", "json") }}}

그러면 다음 스키마를 갖는 DynamicFrame이 생깁니다.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

def unnest

def unnest( transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0 ) : DynamicFrame

중첩된 구조가 모두 평면화된 새 DynamicFrame이 반환됩니다. 이름은 '.'(마침표) 문자를 사용하여 생성합니다.

예를 들어 DynamicFrame에 다음 스키마가 있다고 가정해보겠습니다.

{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}

다음 호출은 주소 구조체의 중첩을 해제합니다.

{{{ df.unnest() }}}

결과 스키마는 다음과 같습니다.

{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}

이 방법도 어레이 내부에 중첩된 구조체의 중첩을 해제합니다. 그러나 기록을 위해 이러한 필드는 포함되는 어레이의 이름과 ".val"이 이름 앞에 붙습니다.

def unnestDDBJson

unnestDDBJson(transformationContext : String = "", callSite : CallSite = CallSite("Not Provided"), stageThreshold : Long = 0, totalThreshold : Long = 0): DynamicFrame

DynamoDB JSON 구조의 DynamicFrame에 있는 중첩된 열을 한정해서 중첩 해제하고, 중첩되지 않은 새 DynamicFrame을 반환합니다. 구조체 유형 배열인 열은 중첩 해제되지 않습니다. 이 변환은 일반적인 unnest 변환과는 다르게 작동하는 특정 유형의 중첩 해제 변환이며, 데이터가 이미 DynamoDB JSON 구조에 있어야 합니다. 자세한 내용은 DynamoDB JSON을 참조하세요.

예를 들어, DynamoDB JSON 구조를 사용하여 내보내기를 읽는 스키마는 다음과 같을 수 있습니다.

root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null

unnestDDBJson() 변환은 이 구조를 다음과 같이 변환합니다.

root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null

다음 코드 예제에서는 AWS Glue DynamoDB 내보내기 커넥터를 사용하고, DynamoDB JSON unnest를 호출하고, 파티션 수를 인쇄하는 방법을 보여줍니다.

import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }

def withFrameSchema

def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
  • getSchema - 사용할 스키마를 반환하는 함수입니다. 0이라는 파라미터 함수로 지정을 하면 비용이 많이 드는 계산을 연기할 수 있습니다.

DynamicFrame의 스키마를 지정된 값으로 설정합니다. 비용이 많이 드는 스키마 재계산을 피하기 위해 주로 내부에서 사용합니다. 통과된 스키마에는 데이터에 존재하는 모든 열이 포함되어 있어야 합니다.

def withName

def withName( name : String ) : DynamicFrame
  • name - 사용할 새 이름입니다.

새 이름이 있는 이 DynamicFrame의 복사본을 반환합니다.

def withTransformationContext

def withTransformationContext( ctx : String ) : DynamicFrame

지정된 변환 컨텍스트가 있는 이 DynamicFrame의 복사본을 반환합니다.