AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션 - AWS Glue

AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션

AWS Glue for Spark에서 다양한 PySpark와 Scala 메서드 및 변환은 connectionType 파라미터를 사용하여 연결 유형을 지정합니다. connectionOptions 또는 options 파라미터를 사용하여 연결 옵션을 지정합니다.

connectionType 파라미터는 다음 표에 표시된 값을 사용할 수 있습니다. 각 유형에 대한 연관된 connectionOptions(또는 options) 파라미터 값은 다음 섹션에 설명되어 있습니다. 별도의 언급이 없으면 이 파라미터는 연결이 소스 또는 싱크로 사용될 때 적용됩니다.

연결 옵션을 설정하고 사용하는 방법을 보여주는 샘플 코드는 각 연결 유형별 홈페이지를 참조하세요.

connectionType 연결 대상
dynamodb Amazon DynamoDB 데이터베이스
kinesis Amazon Kinesis Data Streams
s3 Amazon S3
documentdb Amazon DocumentDB(MongoDB와 호환) 데이터베이스
OpenSearch Amazon OpenSearch Service.
redshift Amazon Redshift 데이터베이스
kafka Kafka 또는 Amazon Managed Streaming for Apache Kafka
azurecosmos NoSQL용 Azure Cosmos.
azuresql Azure SQL.
bigquery Google BigQuery.
mongodb MongoDB Atlas를 포함한 MongoDB 데이터베이스.
sqlserver Microsoft SQL Server 데이터베이스(JDBC 연결 참조)
mysql MySQL 데이터베이스(JDBC 연결 참조)
oracle Oracle Database(JDBC 연결 참조)
postgresql PostgreSQL 데이터베이스(JDBC 연결 참조)
saphana SAP HANA.
snowflake Snowflake 데이터 레이크
Teradata Teradata Vantage.
vertica Vertica.
custom.* Spark, Athena 또는 JDBC 데이터 원본(사용자 정의 및 AWS Marketplace 연결 유형 값 참조)
marketplace.* Spark, Athena 또는 JDBC 데이터 원본(사용자 정의 및 AWS Marketplace 연결 유형 값 참조)

Spark용 AWS Glue 5.0의 ETL에 대한 DataFrame 옵션

DataFrame은 이름이 지정된 열로 구성되는 데이터세트이며, 테이블과 비슷한 기능적 스타일(맵/줄임/필터 등) 작업과 SQL 작업(선택, 계획, 집계)을 지원합니다.

Glue에서 지원하는 데이터 소스에 대한 DataFrame을 생성하려면 다음이 필요합니다.

  • 데이터 소스 커넥터 ClassName

  • 데이터 소스 연결 Options

마찬가지로 Glue에서 지원하는 데이터 싱크에 DataFrame을 작성하려면 동일한 작업이 필요합니다.

  • 데이터 싱크 커넥터 ClassName

  • 데이터 싱크 연결 Options

작업 북마크와 같은 AWS Glue 기능 및 connectionName과 같은 DynamicFrame 옵션은 DataFrame에서 지원되지 않습니다. DataFrame 및 지원되는 작업에 대한 자세한 내용은 DataFrame용 Spark 설명서를 참조하세요.

커넥터 ClassName 지정

데이터 소스/싱크의 ClassName을 지정하려면 .format 옵션을 사용하여 데이터 소스/싱크를 정의하는 해당 커넥터 ClassName을 제공합니다.

JDBC 커넥터

JDBC 커넥터의 경우 .format 옵션의 값으로 jdbc를 지정하고 driver 옵션에 JDBC 드라이버 ClassName을 제공합니다.

df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...

다음 표에는 DataFrames용 AWS Glue에서 지원되는 데이터 소스의 JDBC 드라이버 ClassName이 나열되어 있습니다.

데이터 소스 Driver ClassName
PostgreSQL org.postgresql.Driver
Oracle oracle.jdbc.driver.OracleDriver
SQLServer com.microsoft.sqlserver.jdbc.SQLServerDriver
MySQL com.mysql.jdbc.Driver
SAPHana com.sap.db.jdbc.Driver
Teradata com.teradata.jdbc.TeraDriver
Spark 커넥터

Spark 커넥터의 경우 커넥터의 ClassName.format 옵션 값으로 지정합니다.

df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...

다음 표에는 DataFrames용 AWS Glue에서 지원되는 데이터 소스의 Spark 커넥터 ClassName이 나열되어 있습니다.

데이터 소스 ClassName
MongoDB/DocumentDB glue.spark.mongodb
Redshift io.github.spark_redshift_community.spark.redshift
AzureCosmos cosmos.oltp
AzureSQL com.microsoft.sqlserver.jdbc.spark
BigQuery com.google.cloud.spark.bigquery
OpenSearch org.opensearch.spark.sql
Snowflake net.snowflake.spark.snowflake
Vertica com.vertica.spark.datasource.VerticaSource

연결 옵션 지정

데이터 소스/싱크에 대한 연결의 Options를 지정하려면 .option(<KEY>, <VALUE>)을 사용하여 개별 옵션을 제공하거나 .options(<MAP>)를 사용하여 여러 옵션을 키-값 맵으로 제공합니다.

각 데이터 소스/싱크는 자체 연결 Options 세트를 지원합니다. 사용 가능한 Options에 대한 자세한 내용은 다음 표에 나열된 특정 데이터 소스/싱크 Spark 커넥터의 퍼블릭 설명서를 참조하세요.

예시

다음 예에서는 PostgreSQL에서 읽고 SnowFlake에 씁니다.

Python

예제:

from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala

예제:

import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()

사용자 정의 및 AWS Marketplace 연결 유형 값

여기에는 다음이 포함됩니다.

  • "connectionType": "marketplace.athena": Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다.

  • "connectionType": "marketplace.spark": Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다.

  • "connectionType": "marketplace.jdbc": JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다.

  • "connectionType": "custom.athena": Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.

  • "connectionType": "custom.spark": Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.

  • "connectionType": "custom.jdbc": JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.

custom.jdbc 또는 marketplace.jdbc 유형에 대한 연결 옵션

  • className - 문자열, 필수, 드라이버 클래스 이름입니다.

  • connectionName - 문자열, 필수, 커넥터와 연결된 연결 이름입니다.

  • url- 문자열, 필수, 데이터 원본에 대한 연결을 구축하는 데 사용되는 자리 표시자(${})가 있는 JDBC URL입니다. 자리 표시자 ${secretKey}는 AWS Secrets Manager에서 같은 이름의 보안 암호로 대체됩니다. URL 구성에 대한 자세한 내용은 데이터 스토어 설명서를 참조하세요.

  • secretId 또는 user/password – 문자열, 필수, URL에 대한 자격 증명을 검색하는 데 사용됩니다.

  • dbTable 또는 query - 문자열, 필수, 데이터를 가져올 테이블 또는 SQL 쿼리입니다. dbTable 또는 query을 지정할 수 있지만 둘 다 함께 지정할 수는 없습니다.

  • partitionColumn- 문자열, 선택 사항, 파티셔닝에 사용되는 정수 열의 이름입니다. 이 옵션은 lowerBound, upperBoundnumPartitions에 포함되는 경우에만 작동합니다. 이 옵션은 Spark SQL JDBC 리더에서와 같은 방식으로 작동합니다. 자세한 내용은 Apache Spark SQL, DataFrames and Datasets GuideJDBC To Other Databases를 참조하세요.

    lowerBoundupperBound 값은 테이블의 행을 필터링하는 것이 아니라 파티션 스트라이드를 결정하는 데 사용됩니다. 테이블의 모든 행이 분할되어 반환됩니다.

    참고

    테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된 분할 조건에서 작동하는지 확인해야 합니다. 예:

    • 쿼리 포맷이 "SELECT col1 FROM table1"이면 파티션 열을 사용하는 쿼리 끝에 WHERE 절을 추가하여 쿼리를 테스트합니다.

    • 쿼리 포맷이 "SELECT col1 FROM table1 WHERE col2=val"이면 AND와 파티션 열을 사용하는 표현식으로 WHERE 절을 확장하여 쿼리를 테스트합니다.

  • lowerBound - 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는 partitionColumn의 최소값입니다.

  • upperBound - 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는 partitionColumn의 최대값입니다.

  • numPartitions- 정수, 선택 사항, 파티션 수입니다. 이 값은 lowerBound(포함) 및 upperBound(배타)와 함께 partitionColumn을 분할하는 데 사용되는 생성된 WHERE 절 표현에 대한 파티션 스트라이드를 형성합니다.

    중요

    파티션이 너무 많으면 외부 데이터베이스 시스템에 문제가 발생할 수 있으므로 파티션 수에 주의합니다.

  • filterPredicate - 문자열, 선택 사항, 소스에서 데이터를 필터링하기 위한 추가 조건 절입니다. 예:

    BillingCity='Mountain View'

    테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된 filterPredicate에서 작동하는지 검증해야 합니다. 예:

    • 쿼리 포맷이 "SELECT col1 FROM table1"이면 필터 조건자를 사용하는 쿼리 끝에 WHERE 절을 추가하여 쿼리를 테스트합니다.

    • 쿼리 포맷이 "SELECT col1 FROM table1 WHERE col2=val"이면 ANDWHERE 절을 확장하고 필터 조건자를 사용하는 표현식을 사용하여 쿼리를 테스트합니다.

  • dataTypeMapping – 딕셔너리, 선택 사항, JDBC 데이터 유형에서 Glue 데이터 유형으로의 매핑을 구축하는 사용자 정의 데이터 유형 매핑입니다. 예를 들어 "dataTypeMapping":{"FLOAT":"STRING"} 옵션은 드라이버의 ResultSet.getString() 메서드를 호출하여 JDBC 유형 FLOAT의 데이터 필드를 Java String 유형으로 매핑하고 이를 AWS Glue 레코드를 구축하는 데 사용합니다. ResultSet 객체는 각 드라이버에 의해 구현되므로 동작은 사용하는 드라이버에 따라 다릅니다. 드라이버가 변환을 수행하는 방법을 이해하려면 JDBC 드라이버에 대한 설명서를 참조하세요.

  • 현재 지원되는 AWS Glue 데이터 유형은 다음과 같습니다.

    • DATE

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    지원되는 JDBC 데이터 유형은 Java8 java.sql.types입니다.

    기본 데이터 유형 매핑(JDBC에서 AWS Glue로)은 다음과 같습니다.

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    옵션 dataTypeMapping과 함께 사용자 정의 데이터 유형 매핑을 사용하는 경우 기본 데이터 유형 매핑을 재정의할 수 있습니다. dataTypeMapping 옵션에 나열된 JDBC 데이터 유형만 영향을 받습니다. 기본 매핑은 다른 모든 JDBC 데이터 유형에 사용됩니다. 필요한 경우 추가 JDBC 데이터 유형에 대한 매핑을 추가할 수 있습니다. JDBC 데이터 유형이 기본 매핑이나 사용자 지정 매핑에 포함되지 않은 경우 데이터 유형은 기본값으로 AWS Glue STRING 데이터 유형으로 변환됩니다.

다음 Python 코드 예제는 AWS Marketplace JDBC 드라이버를 사용하여 JDBC 데이터베이스에서 읽는 방법을 보여줍니다. 데이터베이스에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.athena 또는 marketplace.athena 유형에 대한 연결 옵션

  • className - 문자열, 필수, 드라이버 클래스 이름입니다. Athena-CloudWatch 커넥터를 사용하는 경우 이 파라미터 값은 클래스 이름의 접두사(예: "com.amazonaws.athena.connectors")입니다. Athena-CloudWatch 커넥터는 메타데이터 핸들러와 레코드 핸들러의 두 가지 클래스로 구성됩니다. 여기에 공통 접두사를 제공하면 API가 해당 접두사를 기반으로 올바른 클래스를 로드합니다.

  • tableName - 문자열, 필수, 읽을 CloudWatch 로그 스트림의 이름입니다. 이 코드 조각은 특수 보기 이름 all_log_streams을 사용합니다. 즉, 반환된 동적 데이터 프레임에는 로그 그룹의 모든 로그 스트림 데이터가 포함됩니다.

  • schemaName - 문자열, 필수, 읽을 CloudWatch 로그 그룹의 이름입니다. 예: /aws-glue/jobs/output.

  • connectionName - 문자열, 필수, 커넥터와 연결된 연결 이름입니다.

이 커넥터에 대한 추가 옵션은 GitHub의 Amazon Athena CloudWatch 커넥터 README를 참조하세요.

다음 Python 코드 예제는 AWS Marketplace 커넥터를 사용하여 Athena 데이터 스토어에서 읽는 방법을 보여줍니다. Athena에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

custom.spark 또는 marketplace.spark 유형에 대한 연결 옵션

  • className - 문자열, 필수, 커넥터 클래스 이름입니다.

  • secretId - 문자열, 선택 사항, 커넥터 연결에 대한 자격 증명을 검색하는 데 사용됩니다.

  • connectionName - 문자열, 필수, 커넥터와 연결된 연결 이름입니다.

  • 다른 옵션은 데이터 스토어에 따라 다릅니다. 예를 들어 OpenSearch 구성 옵션은 Apache Hadoop용 Elasticsearch 설명서에 설명된 대로 접두사 es로 시작합니다. Snowflake에 대한 Spark 연결은 Connecting to Snowflake 가이드의 Using the Spark Connector에 설명된 대로 sfUsersfPassword와 같은 옵션을 사용합니다.

다음 Python 코드 예제는 marketplace.spark 연결을 사용하여 OpenSearch 데이터 스토어에서 읽는 방법을 보여줍니다.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

일반 옵션

이 섹션의 옵션은 connection_options로 제공되지만 특별히 하나의 커넥터에만 적용되지는 않습니다.

다음 파라미터는 보통 북마크를 구성할 때 사용됩니다. Amazon S3 또는 JDBC 워크플로에 적용될 수도 있습니다. 자세한 내용은 작업 북마크 사용 단원을 참조하십시오.

  • jobBookmarkKeys - 열 이름의 배열입니다.

  • jobBookmarkKeysSortOrder - 정렬 순서에 따라 값을 비교하는 방법을 정의하는 문자열입니다. 유효한 값: "asc", "desc".

  • useS3ListImplementation - Amazon S3 버킷 콘텐츠를 나열할 때 메모리 성능을 관리하는 데 사용됩니다. 자세한 내용은 Optimize memory management in AWS Glue를 참조하세요.