기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
AWS Glue Scala DynamicFrame 클래스
패키지: com.amazonaws.services.glue
class DynamicFrame extends Serializable with Logging (
val glueContext : GlueContext,
_records : RDD[DynamicRecord],
val name : String = s"",
val transformationContext : String = DynamicFrame.UNDEFINED,
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0,
prevErrors : => Long = 0,
errorExpr : => Unit = {} )
DynamicFrame
은 자기 기술형 DynamicRecord 객체의 분산형 모음입니다.
DynamicFrame
은 ETL(extract, transform, load) 작업에 유연한 데이터 모델을 제공할 수 있게끔 설계되어 있습니다. 스키마를 생성할 필요가 없고, 복잡하거나 일관되지 않은 값과 유형을 가진 데이터를 읽고 변환할 때 사용할 수 있습니다. 스키마가 필요한 작업에만 '온 디맨드(필요에 따라)'로 스키마를 계산할 수 있습니다.
DynamicFrame
은 데이터 정리 및 ETL에 필요한 다양한 변환을 지원합니다. 또한 DataFrame이 제공하는 많은 분석 작업과 기존 코드를 통합할 수 있도록 SparkSQL DataFrame 변환을 지원합니다.
다음 파라미터는 DynamicFrame
을 구성하는 수많은 AWS Glue 변환에서 공유됩니다.
transformationContext
- 이DynamicFrame
의 식별자입니다.transformationContext
는 실행 간 지속되는 작업 북마크 상태에 대한 키로 사용됩니다.callSite
- 오류 보고에 필요한 컨텍스트 정보를 제공합니다. Python에서 호출 할 때 자동으로 이 값이 설정됩니다.stageThreshold
- 예외가 발생하기 전에 이DynamicFrame
계산에 허용되는 오류 레코드의 최대 개수입니다(이전DynamicFrame
에 존재하는 레코드 제외).totalThreshold
- 예외가 발생하기 전 전체 오류 레코드의 최대 개수입니다(이전 프레임의 레코드 포함).
val errorsCount
val errorsCount
이 DynamicFrame
의 오류 레코드 개수입니다. 여기에는 이전 작업의 오류가 포함됩니다.
def applyMapping
def applyMapping( mappings : Seq[Product4[String, String, String, String]],
caseSensitive : Boolean = true,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
mappings
- 새DynamicFrame
을 구성하는 매핑 시퀀스입니다.caseSensitive
- 소스 열을 대소문자를 구분해서 처리할지 여부입니다. 이것을 false로 설정하면 AWS Glue Data Catalog 같이 대/소문자를 구분하지 않는 스토어 통합에 도움이 됩니다.
매핑 시퀀스에 따라 열을 선택, 프로젝트, 캐스트 합니다.
각 매핑은 소스 열과 유형 및 대상 열과 유형으로 구성되어 있습니다. 매핑을 4개 튜플(source_path
, source_type
, target_path
, target_type
) 또는 동일한 정보가 있는 MappingSpec 객체로 지정할 수 있습니다.
간단한 프로젝션과 캐스팅에 매핑을 사용하는 것 외에 '.
'(마침표)가 있는 경로의 구성 요소를 분리해 필드를 중첩하거나 중첩을 해제할 때도 매핑을 사용할 수 있습니다.
예를 들어 DynamicFrame
에 다음 스키마가 있다고 가정해보겠습니다.
{{{
root
|-- name: string
|-- age: int
|-- address: struct
| |-- state: string
| |-- zip: int
}}}
다음 호출을 통해 state
및 zip
필드의 중첩을 해제할 수 있습니다.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("address.state", "string", "state", "string"),
("address.zip", "int", "zip", "int")))
}}}
결과 스키마는 다음과 같습니다.
{{{
root
|-- name: string
|-- age: int
|-- state: string
|-- zip: int
}}}
또한 applyMapping
을 사용하여 열을 다시 중첩시킬 수 있습니다. 예를 들어 다음은 이전 변환을 반전시킨 다음에 대상에 address
라는 이름의 구조체를 생성합니다.
{{{
df.applyMapping(
Seq(("name", "string", "name", "string"),
("age", "int", "age", "int"),
("state", "string", "address.state", "string"),
("zip", "int", "address.zip", "int")))
}}}
'.
'(마침표) 문자가 있는 필드 이름은 백틱(``
)을 인용 부호로 사용할 수 있습니다.
참고
현재는 applyMapping
방법을 사용하여 어레이 아래에 중첩된 열을 매핑할 수 없습니다.
def assertErrorThreshold
def assertErrorThreshold : Unit
계산을 강제 실행하고 오류 레코드의 개수가 stageThreshold
및 totalThreshold
미만인지 확인하는 작업입니다. 두 조건 중 하나가 충족되지 않는 경우 예외가 발생합니다.
def count
lazy
def count
이 DynamicFrame
의 요소 개수를 반환합니다.
def dropField
def dropField( path : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
지정한 열이 제거된 새 DynamicFrame
을 반환합니다.
def dropFields
def dropFields( fieldNames : Seq[String], // The column names to drop.
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
지정한 열이 제거된 새 DynamicFrame
을 반환합니다.
이 방법을 사용하여 어레이 내부의 열을 포함해 중첩된 열을 삭제할 수 있지만, 지정 어레이 요소를 없앨 수는 없습니다.
def dropNulls
def dropNulls( transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0 )
null 열이 모두 제거된 새 DynamicFrame
을 반환합니다.
참고
NullType
유형의 열만 제거합니다. 다른 열의 개별 null 값은 제거되거나 수정되지 않습니다.
def errorsAsDynamicFrame
def errorsAsDynamicFrame
이 DynamicFrame
의 오류 레코드를 포함하는 새 DynamicFrame
을 반환합니다.
def filter
def filter( f : DynamicRecord => Boolean,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
함수 'f
'이 true
를 반환하는 레코드만 포함하는 새로운 DynamicFrame
을 생성합니다. 필터 함수 'f
'는 입력 레코드를 바꾸지 않습니다.
def getName
def getName : String
이 DynamicFrame
의 이름을 반환합니다.
def getNumPartitions
def getNumPartitions
이 DynamicFrame
의 파티션 개수를 반환합니다.
def getSchemaIfComputed
def getSchemaIfComputed : Option[Schema]
계산이 완료된 스키마를 반환합니다. 스키마 계산이 완료되지 않은 경우 데이터를 스캔하지 않습니다.
def isSchemaComputed
def isSchemaComputed : Boolean
이 DynamicFrame
에 대해 스키마가 계산되었으면 true
를 반환하고 그렇지 않으면 false
를 반환합니다. 이 방법이 false를 반환할 경우 schema
방법을 호출하려면 이 DynamicFrame
의 레코드를 한 번 더 전달해야 합니다.
def javaToPython
def javaToPython : JavaRDD[Array[Byte]]
def join
def join( keys1 : Seq[String],
keys2 : Seq[String],
frame2 : DynamicFrame,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
keys1
- 이DynamicFrame
에서 조인에 사용할 수 있는 열입니다.keys2
-frame2
에서 조인에 사용할 수 있는 열입니다.keys1
과 길이가 같아야 합니다.frame2
-DynamicFrame
조인 기준입니다.
지정된 키를 사용하여 frame2
로 동등 조인을 수행한 결과를 반환합니다.
def map
def map( f : DynamicRecord => DynamicRecord,
errorMsg : String = "",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
이 DynamicFrame
의 각 레코드에 지정된 함수 'f
'를 적용하여 생성된 새로운 DynamicFrame
을 반환합니다.
이 방법은 레코드가 바뀌지 않도록 각 레코드를 복사한 후에 지정된 함수를 적용합니다. 매핑 함수가 특정 레코드에 대해 예외를 발생시킨 경우, 이 레코드를 오류로 표시하고 스택 트레이스를 오류 레코드의 열로 저장합니다.
def mergeDynamicFrames
def mergeDynamicFrames( stageDynamicFrame: DynamicFrame, primaryKeys: Seq[String], transformationContext: String = "",
options: JsonOptions = JsonOptions.empty, callSite: CallSite = CallSite("Not provided"),
stageThreshold: Long = 0, totalThreshold: Long = 0): DynamicFrame
stageDynamicFrame
- 병합할 스테이징DynamicFrame
입니다.primaryKeys
- 소스 및 스테이징DynamicFrame
의 레코드와 일치시킬 기본 키 필드 목록입니다.transformationContext
- 현재 변환에 대한 메타데이터를 검색하는 데 사용되는 고유한 문자열입니다(선택 사항).options
- 이 변환에 대한 추가 정보를 제공하는 JSON 이름-값 페어 문자열입니다.callSite
- 오류 보고에 필요한 컨텍스트 정보를 제공하는 데 사용합니다.stageThreshold
-Long
입니다. 오류가 발생되는 지정된 변환의 오류 수입니다.totalThreshold
-Long
입니다. 오류가 발생되는 이 변환의 총 오류 수입니다.
레코드를 식별하기 위해, 지정된 기본 키를 기반으로 이 DynamicFrame
을 스테이징 DynamicFrame
과 병합합니다. 중복 레코드(기본 키가 동일한 레코드)는 중복 제거되지 않습니다. 스테이징 프레임에 일치하는 레코드가 없으면 모든 레코드(중복 레코드 포함)가 소스에서 유지됩니다. 스테이징 프레임에 일치하는 레코드가 있으면 스테이징 프레임의 레코드가 AWS Glue의 소스에 있는 레코드를 덮어씁니다.
다음과 같은 경우 반환된 DynamicFrame
에 레코드 A가 포함되어 있습니다.
A
가 소스 프레임과 스테이징 프레임 모두에 있는 경우에는 스테이징 프레임의A
가 반환됩니다.A
가 소스 테이블에 있고A.primaryKeys
가stagingDynamicFrame
에 없는 경우에는A
는 스테이징 테이블에서 업데이트되지 않습니다.
소스 프레임과 스테이징 프레임이 동일한 스키마를 가질 필요는 없습니다.
val mergedFrame: DynamicFrame = srcFrame.mergeDynamicFrames(stageFrame, Seq("id1", "id2"))
def printSchema
def printSchema : Unit
이 DynamicFrame
의 스키마를 사람이 읽을 수 있는 형식의 stdout
으로 인쇄합니다.
def recomputeSchema
def recomputeSchema : Schema
스키마 재계산을 강제 실행합니다. 이를 위해 데이터를 스캔해야 하지만, 현재 스키마의 일부 필드가 데이터에 존재하지 않는 경우에는 스키마를 줄여야 할 수도 있습니다.
재계산한 스키마를 반환합니다.
def relationalize
def relationalize( rootTableName : String,
stagingPath : String,
options : JsonOptions = JsonOptions.empty,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
rootTableName
- 출력에서 기본DynamicFrame
에 사용하는 이름. 이를 접두사로 시작하는 어레이를 피벗하여 생성되는DynamicFrame
입니다.stagingPath
- 중간 데이터 쓰기를 위한 Amazon Simple Storage Service(Amazon S3) 경로입니다.options
- 관계화 옵션 및 구성입니다. 현재 사용되지 않습니다.
모든 중첩된 구조와 피벗 어레이를 별개 테이블로 평면화합니다.
이 작업을 사용하여 크게 중첩된 데이터를 관계형 데이터베이스가 처리하도록 준비할 수 있습니다. 중첩된 구조를 unnest 변환과 동일한 방식으로 평면화합니다. 여기에 더해, 어레이를 각 어레이 요소가 행이 되는 별개 테이블로 피벗합니다. 예를 들어 DynamicFrame
에 다음 데이터가 있다고 가정해보겠습니다.
{"name": "Nancy", "age": 47, "friends": ["Fred", "Lakshmi"]}
{"name": "Stephanie", "age": 28, "friends": ["Yao", "Phil", "Alvin"]}
{"name": "Nathan", "age": 54, "friends": ["Nicolai", "Karen"]}
다음 코드를 실행합니다.
{{{ df.relationalize("people", "s3:/my_bucket/my_path", JsonOptions.empty) }}}
그러면 두 개 테이블이 생성됩니다. 첫 번째 테이블 이름은 "people"이며, 다음이 포함되어 있습니다.
{{{ {"name": "Nancy", "age": 47, "friends": 1} {"name": "Stephanie", "age": 28, "friends": 2} {"name": "Nathan", "age": 54, "friends": 3) }}}
여기에서는 친구 어레이가 자동 생성된 조인 키로 교체되어 있습니다. 다음 내용이 포함된 people.friends
라는 이름의 별개 테이블이 생성됩니다.
{{{ {"id": 1, "index": 0, "val": "Fred"} {"id": 1, "index": 1, "val": "Lakshmi"} {"id": 2, "index": 0, "val": "Yao"} {"id": 2, "index": 1, "val": "Phil"} {"id": 2, "index": 2, "val": "Alvin"} {"id": 3, "index": 0, "val": "Nicolai"} {"id": 3, "index": 1, "val": "Karen"} }}}
이 테이블에서 'id
'는 어레이 요소의 출처인 레코드를 식별하는 조인 키이며, 'index
'는 원본 어레이에서 그 위치를 나타내고, 'val
'는 실제 어레이 항목입니다.
relationalize
방법은 이 프로세스를 모든 어레이에 반복 적용하여 생성되는 DynamicFrame
의 시퀀스를 반환합니다.
참고
AWS Glue 라이브러리는 자동으로 새 테이블에 대한 조인 키를 생성합니다. 작업 실행 간 조인 키가 고유하도록 만들려면 작업 북마크를 활성화해야 합니다.
def renameField
def renameField( oldName : String,
newName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
oldName
- 열의 원래 이름입니다.newName
- 열의 새 이름입니다.
이름이 바뀐 지정 필드로 새 DynamicFrame
을 반환합니다.
이 방법을 사용하여 중첩된 필드의 이름을 바꿀 수 있습니다. 예를 들어 주소 구조체의 state
라는 이름을 state_code
로 바꿉니다.
{{{ df.renameField("address.state", "address.state_code") }}}
def repartition
def repartition( numPartitions : Int,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
numPartitions
파티션이 있는 새 DynamicFrame
을 반환합니다.
def resolveChoice
def resolveChoice( specs : Seq[Product2[String, String]] = Seq.empty[ResolveSpec],
choiceOption : Option[ChoiceOption] = None,
database : Option[String] = None,
tableName : Option[String] = None,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
choiceOption
- 사양 시퀀스에 열거되지 않는 모든ChoiceType
열에 적용되는 작업입니다.database
-match_catalog
작업에 사용할 Data Catalog 데이터베이스입니다.tableName
-match_catalog
작업에 사용할 Data Catalog 테이블입니다.
하나 이상의 ChoiceType
을 보다 구체적인 유형으로 대체하여 새로운 DynamicFrame
을 반환합니다.
resolveChoice
를 사용하는 방법은 두 가지입니다. 첫 번째는 지정 열의 시퀀스와 확인 방법을 지정하는 방법입니다. (열과 작업)쌍으로 구성된 튜플로 지정합니다.
가능한 작업은 다음과 같습니다.
cast:type
- 모든 값을 지정된 유형으로 캐스트하는 시도를 합니다.make_cols
- 각 유형을columnName_type
이라는 이름을 가진 열로 변환합니다.make_struct
- 열을 각 유형에 대한 키가 있는 구조체로 변환합니다.project:type
- 지정된 유형의 값만 유지합니다.
resolveChoice
의 또 다른 모드는 모든 ChoiceType
에 대해 단 하나의 해결책을 지정하는 것입니다. 실행 전에 ChoiceType
의 완전한 목록을 모르는 경우에 사용할 수 있습니다. 이 모드는 위에서 나열한 작업 외에 다음 작업을 지원합니다.
match_catalog
- 각ChoiceType
을 지정된 카탈로그 테이블의 해당 유형에 캐스팅해봅니다.
예:
int로 캐스팅을 해서 user.id
열을 확인하고, address
필드에는 구조체만 유지합니다.
{{{ df.resolveChoice(specs = Seq(("user.id", "cast:int"), ("address", "project:struct"))) }}}
각 선택을 개별 열로 변환하여 전체 ChoiceType
을 확인합니다.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("make_cols"))) }}}
지정된 카탈로그 테이블의 유형으로 캐스팅하여 전체 ChoiceType
을 확인합니다.
{{{ df.resolveChoice(choiceOption = Some(ChoiceOption("match_catalog")), database = Some("my_database"), tableName = Some("my_table")) }}}
def schema
def schema : Schema
이 DynamicFrame
의 스키마를 반환합니다.
반환되는 스키마에는 이 DynamicFrame
의 레코드에 있는 모든 필드가 있습니다. 단, 많지는 않지만 추가 필드가 포함되는 경우도 있을 수 있습니다. unnest 방법을 사용하여 이 DynamicFrame
의 레코드에 따라 스키마를 "줄일" 수 있습니다.
def selectField
def selectField( fieldName : String,
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
단일 필드를 DynamicFrame
으로 반환합니다.
def selectFields
def selectFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
paths
- 선택할 열 이름의 시퀀스입니다.
지정한 열이 포함된 새 DynamicFrame
을 반환합니다.
참고
최상위 열에만 selectFields
방법을 사용할 수 있습니다. applyMapping 방법을 사용하여 중첩된 열을 선택할 수 있습니다.
def show
def show( numRows : Int = 20 ) : Unit
numRows
- 인쇄할 행의 개수입니다.
이 DynamicFrame
의 행을 JSON 형식으로 인쇄합니다.
Def simplifyDDBJson
AWS Glue DynamoDB 내보내기 커넥터를 사용하여 DynamoDB 내보내기를 수행하면 특정 중첩 구조의 JSON 파일이 생성됩니다. 자세한 내용은 데이터 객체를 참조하세요. simplifyDDBJson
이러한 유형의 데이터에 대한 DynamicFrame의 중첩 열을 단순화하고 단순화된 새로운 DynamicFrame을 반환합니다. 목록 유형에 여러 유형이 있거나 맵 유형이 포함된 경우 목록의 요소는 단순화되지 않습니다. 이 메서드는 DynamoDB 내보내기 JSON 형식의 데이터만 지원합니다. 다른 종류의 데이터에 대해 유사한 변경을 수행하려면 unnest
를 고려하세요.
def simplifyDDBJson() : DynamicFrame
이 메서드에서는 파라미터를 사용하지 않습니다.
입력 예
DynamoDB 내보내기로 생성된 다음 스키마를 고려해 보세요.
root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean
예제 코드
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContextimport scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "
ddbTableARN
", "dynamodb.s3.bucket" -> "exportBucketLocation
", "dynamodb.s3.prefix" -> "exportBucketPrefix
", "dynamodb.s3.bucketOwner" -> "exportBucketAccountID
", )) ).getDynamicFrame() val simplified = dynamicFrame.simplifyDDBJson() simplified.printSchema() Job.commit() } }
simplifyDDBJson
변환은 이를 다음과 같이 간소화합니다.
root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
def spigot
def spigot( path : String,
options : JsonOptions = new JsonOptions("{}"),
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
동일한 레코드를 반환하지만 부수적으로 레코드의 하위 집합을 작성하는 패스스루 변환입니다.
path
-s3://bucket//path
형식으로 출력을 쓸 Amazon S3의 경로입니다.options
- 샘플링 동작을 설명하는 선택 사항인JsonOptions
맵입니다.
이것과 동일한 레코드가 포함된 DynamicFrame
을 반환합니다.
path
로 지정한 위치에 임의의 레코드 100개를 작성하는 것이 기본값입니다. options
맵을 사용하여 이 동작을 사용자 지정할 수 있습니다. 유효한 키에는 다음이 포함되어 있습니다.
topk
- 작성할 레코드의 총 개수를 지정합니다. 기본값은 100입니다.prob
- 개별 레코드의 포함 확률(십진수)을 지정합니다. 기본값은 1.
예를 들어 다음 호출은 20퍼센트의 확률로 각 레코드를 선택하고 200개의 레코드를 작성한 후에 중단하는 방법으로 데이터 세트를 샘플링합니다.
{{{ df.spigot("s3://my_bucket/my_path", JsonOptions(Map("topk" -> 200, "prob" -> 0.2))) }}}
def splitFields
def splitFields( paths : Seq[String],
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided", ""),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : Seq[DynamicFrame]
paths
- 첫DynamicFrame
에 포함될 경로입니다.
두 개 DynamicFrame
의 시퀀스를 반환합니다. 첫 번째 DynamicFrame
에는 지정 경로가, 두 번째에는 나머지 열 전부가 포함됩니다.
예
이 예제에서는 AWS Glue Data Catalog의 legislators
데이터베이스에 있는 persons
테이블에서 생성된 DynamicFrame을 사용하고 두 개의 DynamicFrame으로 분할합니다. 지정된 필드는 첫 번째 DynamicFrame으로 이동하고, 나머지 필드는 두 번째 DynamicFrame으로 이동합니다. 그런 다음, 예제는 결과에서 첫 번째 DynamicFrame을 선택합니다.
val InputFrame = glueContext.getCatalogSource(database="legislators", tableName="persons", transformationContext="InputFrame").getDynamicFrame() val SplitField_collection = InputFrame.splitFields(paths=Seq("family_name", "name", "links.note", "links.url", "gender", "image", "identifiers.scheme", "identifiers.identifier", "other_names.lang", "other_names.note", "other_names.name"), transformationContext="SplitField_collection") val ResultFrame = SplitField_collection(0)
def splitRows
def splitRows( paths : Seq[String],
values : Seq[Any],
operators : Seq[String],
transformationContext : String,
callSite : CallSite,
stageThreshold : Long,
totalThreshold : Long
) : Seq[DynamicFrame]
열을 상수에 비교하는 조건자에 따라 행을 분할합니다.
paths
- 비교에 사용할 열입니다.values
- 비교에 사용할 상수 값입니다.operators
- 비교에 사용할 연산자입니다.
두 개 DynamicFrame
의 시퀀스를 반환합니다. 첫 번째에는 조건자가 true, 두 번째에는 조건자가 false인 행이 포함됩니다.
조건자는 다음 세 개의 시퀀스를 사용하여 지정됩니다. 'paths
'에는 (중첩될 수 있는) 열 이름이 포함되고, 'values
'에는 비교할 상수 값이 포함되며, 'operators
'에는 비교에 사용할 연산자가 포함됩니다. 세 가지 시퀀스는 모두 길이가 같아야 합니다. n
번째 연산자를 사용하여 n
번째 열과 n
번째 값을 비교합니다.
각 연산자는 "!=
", "=
", "<=
", "<
", ">=
" 또는 ">
" 중 하나여야 합니다.
예를 들어 다음 호출은 첫 번째 출력 프레임에는 65세 이상의 미국인에 대한 레코드가 포함되고 두 번째에는 나머지 레코드 전부가 포함되도록 DynamicFrame
을 분할합니다.
{{{ df.splitRows(Seq("age", "address.country"), Seq(65, "USA"), Seq(">=", "=")) }}}
def stageErrorsCount
def stageErrorsCount
이 DynamicFrame
을 계산하는 동안 생성된 오류 레코드의 개수를 반환합니다. 이 DynamicFrame
에 입력으로 전달된 이전 작업의 오류는 제외됩니다.
def toDF
def toDF( specs : Seq[ResolveSpec] = Seq.empty[ResolveSpec] ) : DataFrame
이 DynamicFrame
을 스키마와 레코드가 동일한 Apache Spark SQL DataFrame
으로 변환합니다.
참고
DataFrame
은 ChoiceType
을 지원하지 않으므로 이 방법은 ChoiceType
열을 StructType
로 자동 변환합니다. 선택할 수 있는 확인(해결) 방법에 대한 자세한 내용은 resolveChoice 단원을 참조하십시오.
def unbox
def unbox( path : String,
format : String,
optionString : String = "{}",
transformationContext : String = "",
callSite : CallSite = CallSite("Not provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
path
- 구문 분석할 열입니다. 문자열 또는 이진수여야 합니다.format
- 구문 분석에 사용할 포맷입니다.optionString
- CSV 구분자 같은 포맷으로 전달되는 옵션입니다.
지정 형식에 따라 포함된 문자열이나 이진수 열을 구문 분석합니다. 구문 분석한 열은 원래 열 이름을 가진 구조체 아래 중첩됩니다.
예를 들어 JSON 열이 포함된 CSV 파일이 있다고 가정해보겠습니다.
name, age, address Sally, 36, {"state": "NE", "city": "Omaha"} ...
최초 구문 분석 후에 다음 스키마를 갖는 DynamicFrame
이 생깁니다.
{{{ root |-- name: string |-- age: int |-- address: string }}}
주소 열에 unbox
를 호출해 지정 구성 요소를 구문 분석할 수 있습니다.
{{{ df.unbox("address", "json") }}}
그러면 다음 스키마를 갖는 DynamicFrame
이 생깁니다.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
def unnest
def unnest( transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0
) : DynamicFrame
중첩된 구조가 모두 평면화된 새 DynamicFrame
이 반환됩니다. 이름은 '.
'(마침표) 문자를 사용하여 생성합니다.
예를 들어 DynamicFrame
에 다음 스키마가 있다고 가정해보겠습니다.
{{{ root |-- name: string |-- age: int |-- address: struct | |-- state: string | |-- city: string }}}
다음 호출은 주소 구조체의 중첩을 해제합니다.
{{{ df.unnest() }}}
결과 스키마는 다음과 같습니다.
{{{ root |-- name: string |-- age: int |-- address.state: string |-- address.city: string }}}
이 방법도 어레이 내부에 중첩된 구조체의 중첩을 해제합니다. 그러나 기록을 위해 이러한 필드는 포함되는 어레이의 이름과 ".val
"이 이름 앞에 붙습니다.
def unnestDDBJson
unnestDDBJson(transformationContext : String = "",
callSite : CallSite = CallSite("Not Provided"),
stageThreshold : Long = 0,
totalThreshold : Long = 0): DynamicFrame
DynamoDB JSON 구조의 DynamicFrame
에 있는 중첩된 열을 한정해서 중첩 해제하고, 중첩되지 않은 새 DynamicFrame
을 반환합니다. 구조체 유형 배열인 열은 중첩 해제되지 않습니다. 이 변환은 일반적인 unnest
변환과는 다르게 작동하는 특정 유형의 중첩 해제 변환이며, 데이터가 이미 DynamoDB JSON 구조에 있어야 합니다. 자세한 내용은 DynamoDB JSON을 참조하세요.
예를 들어, DynamoDB JSON 구조를 사용하여 내보내기를 읽는 스키마는 다음과 같을 수 있습니다.
root |-- Item: struct | |-- ColA: struct | | |-- S: string | |-- ColB: struct | | |-- S: string | |-- ColC: struct | | |-- N: string | |-- ColD: struct | | |-- L: array | | | |-- element: null
unnestDDBJson()
변환은 이 구조를 다음과 같이 변환합니다.
root |-- ColA: string |-- ColB: string |-- ColC: string |-- ColD: array | |-- element: null
다음 코드 예제에서는 AWS Glue DynamoDB 내보내기 커넥터를 사용하고, DynamoDB JSON unnest를 호출하고, 파티션 수를 인쇄하는 방법을 보여줍니다.
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> "<test_source>", "dynamodb.s3.bucket" -> "<bucket name>", "dynamodb.s3.prefix" -> "<bucket prefix>", "dynamodb.s3.bucketOwner" -> "<account_id of bucket>", )) ).getDynamicFrame() val unnested = dynamicFrame.unnestDDBJson() print(unnested.getNumPartitions()) Job.commit() } }
def withFrameSchema
def withFrameSchema( getSchema : () => Schema ) : DynamicFrame
getSchema
- 사용할 스키마를 반환하는 함수입니다. 0이라는 파라미터 함수로 지정을 하면 비용이 많이 드는 계산을 연기할 수 있습니다.
이 DynamicFrame
의 스키마를 지정된 값으로 설정합니다. 비용이 많이 드는 스키마 재계산을 피하기 위해 주로 내부에서 사용합니다. 통과된 스키마에는 데이터에 존재하는 모든 열이 포함되어 있어야 합니다.
def withName
def withName( name : String ) : DynamicFrame
name
- 사용할 새 이름입니다.
새 이름이 있는 이 DynamicFrame
의 복사본을 반환합니다.
def withTransformationContext
def withTransformationContext( ctx : String ) : DynamicFrame
지정된 변환 컨텍스트가 있는 이 DynamicFrame
의 복사본을 반환합니다.