AWS Glue ETL 작업을 사용하여 데이터 카탈로그에서 스키마 업데이트 및 새 파티션 추가 - AWS Glue

AWS Glue ETL 작업을 사용하여 데이터 카탈로그에서 스키마 업데이트 및 새 파티션 추가

추출, 변환 및 로드(ETL) 작업은 대상 데이터 스토어에 새 테이블 파티션을 생성할 수 있습니다. 데이터 집합 스키마는 시간이 지남에 따라 진화하면서 AWS Glue Data Catalog 스키마에서 벗어날 수 있습니다. AWS Glue 이제 ETL 작업에서 여러 기능을 사용해 ETL 스크립트 내에서 Data Catalog의 스키마 및 파티션을 업데이트할 수 있습니다. 이러한 기능을 통해 크롤러를 다시 실행할 필요 없이 Data Catalog의 ETL 작업 결과를 볼 수 있습니다.

새 파티션

AWS Glue Data Catalog의 새 파티션을 보려면 다음 중 하나를 수행할 수 있습니다.

  • 작업이 완료된 후 크롤러를 다시 실행하고, 크롤러가 완료되면 콘솔에서 새 파티션을 확인합니다.

  • 작업이 완료된 후 크롤러를 다시 실행하지 않고 콘솔에서 바로 새 파티션을 확인합니다. 다음 예제와 같이 몇 줄의 코드를 ETL 스크립트에 추가하여 이 기능을 활성화할 수 있습니다. 이 코드는 새 파티션이 생성될 때 작업 실행 중에 Data Catalog를 업데이트하는 enableUpdateCatalog 인수를 사용합니다.

방법 1

옵션 인수에서 enableUpdateCatalogpartitionKeys를 전달합니다.

Python
additionalOptions = {"enableUpdateCatalog": True} additionalOptions["partitionKeys"] = ["region", "year", "month", "day"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<target_db_name>, table_name=<target_table_name>, transformation_ctx="write_sink", additional_options=additionalOptions)
Scala
val options = JsonOptions(Map( "path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink( database = <target_db_name>, tableName = <target_table_name>, additionalOptions = options)sink.writeDynamicFrame(df)
방법 2

getSink()에서 enableUpdateCatalogpartitionKeys를 전달하고 DataSink 객체에서 setCatalogInfo()를 호출합니다.

Python
sink = glueContext.getSink( connection_type="s3", path="<S3_output_path>", enableUpdateCatalog=True, partitionKeys=["region", "year", "month", "day"]) sink.setFormat("json") sink.setCatalogInfo(catalogDatabase=<target_db_name>, catalogTableName=<target_table_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions( Map("path" -> <S3_output_path>, "partitionKeys" -> Seq("region", "year", "month", "day"), "enableUpdateCatalog" -> true)) val sink = glueContext.getSink("s3", options).withFormat("json") sink.setCatalogInfo(<target_db_name>, <target_table_name>) sink.writeDynamicFrame(df)

이제 크롤러를 다시 실행할 필요 없이 Data Catalog에서 AWS Glue ETL 작업 자체를 사용해 새 카탈로그 테이블을 생성하고, 수정된 스키마로 기존 테이블을 업데이트하고, 새 테이블 파티션을 추가할 수 있습니다.

테이블 스키마 업데이트

Data Catalog 테이블의 스키마를 덮어쓰려면 다음 중 하나를 수행할 수 있습니다.

  • 작업이 완료되면 크롤러를 다시 실행하고 크롤러가 테이블 정의도 업데이트하도록 구성되어 있는지 확인합니다. 크롤러가 완료되면 콘솔의 새 파티션과 스키마 업데이트를 확인합니다. 자세한 내용은 API를 사용해 크롤러 구성을 참조하십시오.

  • 작업이 완료된 후 크롤러를 다시 실행하지 않고 콘솔에서 바로 수정된 스키마를 확인합니다. 다음 예제와 같이 몇 줄의 코드를 ETL 스크립트에 추가하여 이 기능을 활성화할 수 있습니다. 이 코드에서는 enableUpdateCatalog가 true로, updateBehaviorUPDATE_IN_DATABASE로 설정되어 있는데, 이를 통해 작업 실행 중에 Data Catalog에서 스키마를 덮어쓰고 새 파티션을 추가하게 됩니다.

Python
additionalOptions = { "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE"} additionalOptions["partitionKeys"] = ["partition_key0", "partition_key1"] sink = glueContext.write_dynamic_frame_from_catalog(frame=last_transform, database=<dst_db_name>, table_name=<dst_tbl_name>, transformation_ctx="write_sink", additional_options=additionalOptions) job.commit()
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("partition_0", "partition_1"), "enableUpdateCatalog" -> true)) val sink = glueContext.getCatalogSink(database = nameSpace, tableName = tableName, additionalOptions = options) sink.writeDynamicFrame(df)

또한 테이블 스키마를 덮어쓰지 않고 새 파티션을 추가하려는 경우에는 updateBehavior 값을 LOG로 설정하면 됩니다. updateBehavior의 기본값은 UPDATE_IN_DATABASE이므로 명시적으로 정의하지 않으면 테이블 스키마를 덮어씁니다.

enableUpdateCatalog가 true로 설정되어 있지 않으면 updateBehavior에서 선택한 옵션에 관계없이 ETL 작업이 Data Catalog의 테이블을 업데이트하지 않습니다.

새 테이블 생성

동일한 옵션을 사용해 Data Catalog에서 새 테이블을 생성할 수도 있습니다. setCatalogInfo를 사용해 데이터베이스 및 새 테이블 이름을 지정할 수 있습니다.

Python
sink = glueContext.getSink(connection_type="s3", path="s3://path/to/data", enableUpdateCatalog=True, updateBehavior="UPDATE_IN_DATABASE", partitionKeys=["partition_key0", "partition_key1"]) sink.setFormat("<format>") sink.setCatalogInfo(catalogDatabase=<dst_db_name>, catalogTableName=<dst_tbl_name>) sink.writeFrame(last_transform)
Scala
val options = JsonOptions(Map( "path" -> outputPath, "partitionKeys" -> Seq("<partition_1>", "<partition_2>"), "enableUpdateCatalog" -> true, "updateBehavior" -> "UPDATE_IN_DATABASE")) val sink = glueContext.getSink(connectionType = "s3", connectionOptions = options).withFormat("<format>") sink.setCatalogInfo(catalogDatabase = “<dst_db_name>”, catalogTableName = “<dst_tbl_name>”) sink.writeDynamicFrame(df)

제한 사항

다음 제한 사항에 유의하십시오.

  • Amazon Simple Storage Service(Amazon S3) 대상만 지원됩니다.

  • enableUpdateCatalog 기능은 관리 테이블에서 지원되지 않습니다.

  • json, csv, avroparquet 형식만 지원됩니다.

  • parquet 분류로 테이블을 생성하거나 업데이트하려면 Dynamic Frames에 대해 AWS Glue 최적화 parquet 라이터를 사용해야 합니다. 이는 다음 중 하나를 통해 수행할 수 있습니다.

    • 카탈로그의 기존 테이블을 parquet 분류로 업데이트하는 경우 테이블을 업데이트하기 전에 테이블에서 "useGlueParquetWriter" 테이블 속성이 true로 설정되어 있어야 합니다. AWS Glue API/SDK, 콘솔 또는 Athena DDL 문을 통해 이 속성을 설정할 수 있습니다.

      AWS Glue 콘솔의 카탈로그 테이블 속성 편집 필드.

      카탈로그 테이블 속성이 설정되면 다음 코드 스니펫을 사용하여 카탈로그 테이블을 새 데이터로 업데이트할 수 있습니다.

      glueContext.write_dynamic_frame.from_catalog( frame=frameToWrite, database="dbName", table_name="tableName", additional_options={ "enableUpdateCatalog": True, "updateBehavior": "UPDATE_IN_DATABASE" } )
    • 테이블이 카탈로그에 아직 없는 경우 connection_type="s3"를 통해 스크립트에서 getSink() 메서드를 사용하여 Amazon S3에 데이터를 쓰는 작업과 함께 테이블과 해당 파티션을 카탈로그에 추가할 수 있습니다. 워크플로에 적절한 partitionKeyscompression을 제공합니다.

      s3sink = glueContext.getSink( path="s3://bucket/folder/", connection_type="s3", updateBehavior="UPDATE_IN_DATABASE", partitionKeys=[], compression="snappy", enableUpdateCatalog=True ) s3sink.setCatalogInfo( catalogDatabase="dbName", catalogTableName="tableName" ) s3sink.setFormat("parquet", useGlueParquetWriter=True) s3sink.writeFrame(frameToWrite)
    • glueparquet 형식 값은 AWS Glue parquet 작성기를 활성화하는 기존 방법입니다.

  • updateBehaviorLOG로 설정하면 DynamicFrame 스키마가 Data Catalog 테이블의 스키마에 정의된 열의 하위 집합과 동일하거나 해당 집합을 포함하고 있는 경우에만 새 파티션이 추가됩니다.

  • 분할되지 않은 테이블에는 스키마 업데이트가 지원되지 않습니다(“partitionKeys” 옵션 사용 안 함).

  • partitionKeys는 ETL 스크립트에 전달된 파라미터와 Data Catalog 테이블 스키마의 partitionKeys 간에 동일한 순서로 동일해야 합니다.

  • 이 기능은 현재 업데이트 스키마가 중첩된 테이블 업데이트/생성을 아직 지원하지 않습니다(예: 구조체 내부 배열).

자세한 내용은 Spark 스크립트 프로그래밍 단원을 참조하십시오.