AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션
AWS Glue for Spark에서 다양한 PySpark와 Scala 메서드 및 변환은 connectionType
파라미터를 사용하여 연결 유형을 지정합니다. connectionOptions
또는 options
파라미터를 사용하여 연결 옵션을 지정합니다.
connectionType
파라미터는 다음 표에 표시된 값을 사용할 수 있습니다. 각 유형에 대한 연관된 connectionOptions
(또는 options
) 파라미터 값은 다음 섹션에 설명되어 있습니다. 별도의 언급이 없으면 이 파라미터는 연결이 소스 또는 싱크로 사용될 때 적용됩니다.
연결 옵션을 설정하고 사용하는 방법을 보여주는 샘플 코드는 각 연결 유형별 홈페이지를 참조하세요.
connectionType |
연결 대상 |
---|---|
dynamodb | Amazon DynamoDB 데이터베이스 |
kinesis | Amazon Kinesis Data Streams |
s3 | Amazon S3 |
documentdb | Amazon DocumentDB(MongoDB와 호환) 데이터베이스 |
OpenSearch | Amazon OpenSearch Service. |
redshift | Amazon Redshift |
kafka | Kafka |
azurecosmos | NoSQL용 Azure Cosmos. |
azuresql | Azure SQL. |
bigquery | Google BigQuery. |
mongodb | MongoDB Atlas를 포함한 MongoDB |
sqlserver | Microsoft SQL Server 데이터베이스(JDBC 연결 참조) |
mysql | MySQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA. |
snowflake | Snowflake |
Teradata | Teradata Vantage. |
vertica | Vertica. |
custom.* | Spark, Athena 또는 JDBC 데이터 원본(사용자 정의 및 AWS Marketplace 연결 유형 값 참조) |
marketplace.* | Spark, Athena 또는 JDBC 데이터 원본(사용자 정의 및 AWS Marketplace 연결 유형 값 참조) |
Spark용 AWS Glue 5.0의 ETL에 대한 DataFrame 옵션
DataFrame은 이름이 지정된 열로 구성되는 데이터세트이며, 테이블과 비슷한 기능적 스타일(맵/줄임/필터 등) 작업과 SQL 작업(선택, 계획, 집계)을 지원합니다.
Glue에서 지원하는 데이터 소스에 대한 DataFrame을 생성하려면 다음이 필요합니다.
데이터 소스 커넥터
ClassName
데이터 소스 연결
Options
마찬가지로 Glue에서 지원하는 데이터 싱크에 DataFrame을 작성하려면 동일한 작업이 필요합니다.
데이터 싱크 커넥터
ClassName
데이터 싱크 연결
Options
작업 북마크와 같은 AWS Glue 기능 및 connectionName
과 같은 DynamicFrame 옵션은 DataFrame에서 지원되지 않습니다. DataFrame 및 지원되는 작업에 대한 자세한 내용은 DataFrame
커넥터 ClassName 지정
데이터 소스/싱크의 ClassName
을 지정하려면 .format
옵션을 사용하여 데이터 소스/싱크를 정의하는 해당 커넥터 ClassName
을 제공합니다.
JDBC 커넥터
JDBC 커넥터의 경우 .format
옵션의 값으로 jdbc
를 지정하고 driver
옵션에 JDBC 드라이버 ClassName
을 제공합니다.
df = spark.read.format("jdbc").option("driver", "<DATA SOURCE JDBC DRIVER CLASSNAME>")... df.write.format("jdbc").option("driver", "<DATA SINK JDBC DRIVER CLASSNAME>")...
다음 표에는 DataFrames용 AWS Glue에서 지원되는 데이터 소스의 JDBC 드라이버 ClassName
이 나열되어 있습니다.
데이터 소스 | Driver ClassName |
---|---|
PostgreSQL | org.postgresql.Driver |
Oracle | oracle.jdbc.driver.OracleDriver |
SQLServer | com.microsoft.sqlserver.jdbc.SQLServerDriver |
MySQL | com.mysql.jdbc.Driver |
SAPHana | com.sap.db.jdbc.Driver |
Teradata | com.teradata.jdbc.TeraDriver |
Spark 커넥터
Spark 커넥터의 경우 커넥터의 ClassName
을 .format
옵션 값으로 지정합니다.
df = spark.read.format("<DATA SOURCE CONNECTOR CLASSNAME>")... df.write.format("<DATA SINK CONNECTOR CLASSNAME>")...
다음 표에는 DataFrames용 AWS Glue에서 지원되는 데이터 소스의 Spark 커넥터 ClassName
이 나열되어 있습니다.
데이터 소스 | ClassName |
---|---|
MongoDB/DocumentDB | glue.spark.mongodb |
Redshift | io.github.spark_redshift_community.spark.redshift |
AzureCosmos | cosmos.oltp |
AzureSQL | com.microsoft.sqlserver.jdbc.spark |
BigQuery | com.google.cloud.spark.bigquery |
OpenSearch | org.opensearch.spark.sql |
Snowflake | net.snowflake.spark.snowflake |
Vertica | com.vertica.spark.datasource.VerticaSource |
연결 옵션 지정
데이터 소스/싱크에 대한 연결의 Options
를 지정하려면 .option(<KEY>, <VALUE>)
을 사용하여 개별 옵션을 제공하거나 .options(<MAP>)
를 사용하여 여러 옵션을 키-값 맵으로 제공합니다.
각 데이터 소스/싱크는 자체 연결 Options
세트를 지원합니다. 사용 가능한 Options
에 대한 자세한 내용은 다음 표에 나열된 특정 데이터 소스/싱크 Spark 커넥터의 퍼블릭 설명서를 참조하세요.
예시
다음 예에서는 PostgreSQL에서 읽고 SnowFlake에 씁니다.
Python
예제:
from awsglue.context import GlueContext from pyspark.sql import SparkSession spark = SparkSession.builder.getOrCreate() dataSourceClassName = "jdbc" dataSourceOptions = { "driver": "org.postgresql.Driver", "url": "<url>", "user": "<user>", "password": "<password>", "dbtable": "<dbtable>", } dataframe = spark.read.format(className).options(**options).load() dataSinkClassName = "net.snowflake.spark.snowflake" dataSinkOptions = { "sfUrl": "<url>", "sfUsername": "<username>", "sfPassword": "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" } dataframe.write.format(dataSinkClassName).options(**dataSinkOptions).save()
Scala
예제:
import org.apache.spark.sql.SparkSession val spark = SparkSession.builder().getOrCreate() val dataSourceClassName = "jdbc" val dataSourceOptions = Map( "driver" -> "org.postgresql.Driver", "url" -> "<url>", "user" -> "<user>", "password" -> "<password>", "dbtable" -> "<dbtable>" ) val dataframe = spark.read.format(dataSourceClassName).options(dataSourceOptions).load() val dataSinkClassName = "net.snowflake.spark.snowflake" val dataSinkOptions = Map( "sfUrl" -> "<url>", "sfUsername" -> "<username>", "sfPassword" -> "<password>", "sfDatabase" -> "<database>", "sfSchema" -> "<schema>", "sfWarehouse" -> "<warehouse>" ) dataframe.write.format(dataSinkClassName).options(dataSinkOptions).save()
사용자 정의 및 AWS Marketplace 연결 유형 값
여기에는 다음이 포함됩니다.
-
"connectionType": "marketplace.athena"
: Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다. -
"connectionType": "marketplace.spark"
: Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다. -
"connectionType": "marketplace.jdbc"
: JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다. -
"connectionType": "custom.athena"
: Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다. -
"connectionType": "custom.spark"
: Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다. -
"connectionType": "custom.jdbc"
: JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.
custom.jdbc 또는 marketplace.jdbc 유형에 대한 연결 옵션
-
className
- 문자열, 필수, 드라이버 클래스 이름입니다. -
connectionName
- 문자열, 필수, 커넥터와 연결된 연결 이름입니다. -
url
- 문자열, 필수, 데이터 원본에 대한 연결을 구축하는 데 사용되는 자리 표시자(${}
)가 있는 JDBC URL입니다. 자리 표시자${secretKey}
는 AWS Secrets Manager에서 같은 이름의 보안 암호로 대체됩니다. URL 구성에 대한 자세한 내용은 데이터 스토어 설명서를 참조하세요. -
secretId
또는user/password
– 문자열, 필수, URL에 대한 자격 증명을 검색하는 데 사용됩니다. -
dbTable
또는query
- 문자열, 필수, 데이터를 가져올 테이블 또는 SQL 쿼리입니다.dbTable
또는query
을 지정할 수 있지만 둘 다 함께 지정할 수는 없습니다. -
partitionColumn
- 문자열, 선택 사항, 파티셔닝에 사용되는 정수 열의 이름입니다. 이 옵션은lowerBound
,upperBound
및numPartitions
에 포함되는 경우에만 작동합니다. 이 옵션은 Spark SQL JDBC 리더에서와 같은 방식으로 작동합니다. 자세한 내용은 Apache Spark SQL, DataFrames and Datasets Guide의 JDBC To Other Databases를 참조하세요. lowerBound
및upperBound
값은 테이블의 행을 필터링하는 것이 아니라 파티션 스트라이드를 결정하는 데 사용됩니다. 테이블의 모든 행이 분할되어 반환됩니다.참고
테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된 분할 조건에서 작동하는지 확인해야 합니다. 예:
-
쿼리 포맷이
"SELECT col1 FROM table1"
이면 파티션 열을 사용하는 쿼리 끝에WHERE
절을 추가하여 쿼리를 테스트합니다. -
쿼리 포맷이 "
SELECT col1 FROM table1 WHERE col2=val"
이면AND
와 파티션 열을 사용하는 표현식으로WHERE
절을 확장하여 쿼리를 테스트합니다.
-
-
lowerBound
- 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는partitionColumn
의 최소값입니다. -
upperBound
- 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는partitionColumn
의 최대값입니다. -
numPartitions
- 정수, 선택 사항, 파티션 수입니다. 이 값은lowerBound
(포함) 및upperBound
(배타)와 함께partitionColumn
을 분할하는 데 사용되는 생성된WHERE
절 표현에 대한 파티션 스트라이드를 형성합니다.중요
파티션이 너무 많으면 외부 데이터베이스 시스템에 문제가 발생할 수 있으므로 파티션 수에 주의합니다.
-
filterPredicate
- 문자열, 선택 사항, 소스에서 데이터를 필터링하기 위한 추가 조건 절입니다. 예:BillingCity='Mountain View'
테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된
filterPredicate
에서 작동하는지 검증해야 합니다. 예:-
쿼리 포맷이
"SELECT col1 FROM table1"
이면 필터 조건자를 사용하는 쿼리 끝에WHERE
절을 추가하여 쿼리를 테스트합니다. -
쿼리 포맷이
"SELECT col1 FROM table1 WHERE col2=val"
이면AND
로WHERE
절을 확장하고 필터 조건자를 사용하는 표현식을 사용하여 쿼리를 테스트합니다.
-
-
dataTypeMapping
– 딕셔너리, 선택 사항, JDBC 데이터 유형에서 Glue 데이터 유형으로의 매핑을 구축하는 사용자 정의 데이터 유형 매핑입니다. 예를 들어"dataTypeMapping":{"FLOAT":"STRING"}
옵션은 드라이버의ResultSet.getString()
메서드를 호출하여 JDBC 유형FLOAT
의 데이터 필드를 JavaString
유형으로 매핑하고 이를 AWS Glue 레코드를 구축하는 데 사용합니다.ResultSet
객체는 각 드라이버에 의해 구현되므로 동작은 사용하는 드라이버에 따라 다릅니다. 드라이버가 변환을 수행하는 방법을 이해하려면 JDBC 드라이버에 대한 설명서를 참조하세요. -
현재 지원되는 AWS Glue 데이터 유형은 다음과 같습니다.
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
지원되는 JDBC 데이터 유형은 Java8 java.sql.types
입니다. 기본 데이터 유형 매핑(JDBC에서 AWS Glue로)은 다음과 같습니다.
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
옵션
dataTypeMapping
과 함께 사용자 정의 데이터 유형 매핑을 사용하는 경우 기본 데이터 유형 매핑을 재정의할 수 있습니다.dataTypeMapping
옵션에 나열된 JDBC 데이터 유형만 영향을 받습니다. 기본 매핑은 다른 모든 JDBC 데이터 유형에 사용됩니다. 필요한 경우 추가 JDBC 데이터 유형에 대한 매핑을 추가할 수 있습니다. JDBC 데이터 유형이 기본 매핑이나 사용자 지정 매핑에 포함되지 않은 경우 데이터 유형은 기본값으로 AWS GlueSTRING
데이터 유형으로 변환됩니다. -
다음 Python 코드 예제는 AWS Marketplace JDBC 드라이버를 사용하여 JDBC 데이터베이스에서 읽는 방법을 보여줍니다. 데이터베이스에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
custom.athena 또는 marketplace.athena 유형에 대한 연결 옵션
-
className
- 문자열, 필수, 드라이버 클래스 이름입니다. Athena-CloudWatch 커넥터를 사용하는 경우 이 파라미터 값은 클래스 이름의 접두사(예:"com.amazonaws.athena.connectors"
)입니다. Athena-CloudWatch 커넥터는 메타데이터 핸들러와 레코드 핸들러의 두 가지 클래스로 구성됩니다. 여기에 공통 접두사를 제공하면 API가 해당 접두사를 기반으로 올바른 클래스를 로드합니다. -
tableName
- 문자열, 필수, 읽을 CloudWatch 로그 스트림의 이름입니다. 이 코드 조각은 특수 보기 이름all_log_streams
을 사용합니다. 즉, 반환된 동적 데이터 프레임에는 로그 그룹의 모든 로그 스트림 데이터가 포함됩니다. -
schemaName
- 문자열, 필수, 읽을 CloudWatch 로그 그룹의 이름입니다. 예:/aws-glue/jobs/output
. -
connectionName
- 문자열, 필수, 커넥터와 연결된 연결 이름입니다.
이 커넥터에 대한 추가 옵션은 GitHub의 Amazon Athena CloudWatch 커넥터 README
다음 Python 코드 예제는 AWS Marketplace 커넥터를 사용하여 Athena 데이터 스토어에서 읽는 방법을 보여줍니다. Athena에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
custom.spark 또는 marketplace.spark 유형에 대한 연결 옵션
-
className
- 문자열, 필수, 커넥터 클래스 이름입니다. -
secretId
- 문자열, 선택 사항, 커넥터 연결에 대한 자격 증명을 검색하는 데 사용됩니다. -
connectionName
- 문자열, 필수, 커넥터와 연결된 연결 이름입니다. -
다른 옵션은 데이터 스토어에 따라 다릅니다. 예를 들어 OpenSearch 구성 옵션은 Apache Hadoop용 Elasticsearch
설명서에 설명된 대로 접두사 es
로 시작합니다. Snowflake에 대한 Spark 연결은 Connecting to Snowflake 가이드의 Using the Spark Connector에 설명된 대로 sfUser
및sfPassword
와 같은 옵션을 사용합니다.
다음 Python 코드 예제는 marketplace.spark
연결을 사용하여 OpenSearch 데이터 스토어에서 읽는 방법을 보여줍니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
일반 옵션
이 섹션의 옵션은 connection_options
로 제공되지만 특별히 하나의 커넥터에만 적용되지는 않습니다.
다음 파라미터는 보통 북마크를 구성할 때 사용됩니다. Amazon S3 또는 JDBC 워크플로에 적용될 수도 있습니다. 자세한 내용은 작업 북마크 사용 단원을 참조하십시오.
jobBookmarkKeys
- 열 이름의 배열입니다.jobBookmarkKeysSortOrder
- 정렬 순서에 따라 값을 비교하는 방법을 정의하는 문자열입니다. 유효한 값:"asc"
,"desc"
.useS3ListImplementation
- Amazon S3 버킷 콘텐츠를 나열할 때 메모리 성능을 관리하는 데 사용됩니다. 자세한 내용은 Optimize memory management in AWS Glue를 참조하세요.