기계 번역으로 제공되는 번역입니다. 제공된 번역과 원본 영어의 내용이 상충하는 경우에는 영어 버전이 우선합니다.
AWS Glue for Spark에서 ETL에 대한 연결 유형 및 옵션
AWS Glue for Spark에서 다양한 PySpark와 Scala 메서드 및 변환은 connectionType
파라미터를 사용하여 연결 유형을 지정합니다. connectionOptions
또는 options
파라미터를 사용하여 연결 옵션을 지정합니다.
connectionType
파라미터는 다음 표에 표시된 값을 사용할 수 있습니다. 각 유형에 대한 연관된 connectionOptions
(또는 options
) 파라미터 값은 다음 섹션에 설명되어 있습니다. 별도의 언급이 없으면 이 파라미터는 연결이 소스 또는 싱크로 사용될 때 적용됩니다.
연결 옵션을 설정하고 사용하는 방법을 보여주는 샘플 코드는 각 연결 유형별 홈페이지를 참조하세요.
connectionType |
연결 대상 |
---|---|
dynamodb | Amazon DynamoDB 데이터베이스 |
kinesis | Amazon Kinesis Data Streams |
s3 | Amazon S3 |
documentdb | Amazon DocumentDB(MongoDB와 호환) 데이터베이스 |
OpenSearch | Amazon OpenSearch Service. |
redshift | Amazon Redshift |
kafka | Kafka |
azurecosmos | NoSQL용 Azure Cosmos. |
azuresql | Azure SQL. |
bigquery | Google BigQuery. |
mongodb | MongoDB Atlas를 포함한 MongoDB |
sqlserver | Microsoft SQL Server 데이터베이스(JDBC 연결 참조) |
mysql | MySQL |
oracle | Oracle |
postgresql | PostgreSQL |
saphana | SAP HANA. |
snowflake | Snowflake |
Teradata | Teradata Vantage. |
vertica | Vertica. |
custom.* | Spark, Athena 또는 JDBC 데이터 원본(사용자 정의 및 AWS Marketplace 연결 유형 값 참조) |
marketplace.* | Spark, Athena 또는 JDBC 데이터 원본(사용자 정의 및 AWS Marketplace 연결 유형 값 참조) |
사용자 정의 및 AWS Marketplace 연결 유형 값
여기에는 다음이 포함됩니다.
-
"connectionType": "marketplace.athena"
: Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다. -
"connectionType": "marketplace.spark"
: Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다. -
"connectionType": "marketplace.jdbc"
: JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Marketplace의 커넥터를 사용합니다. -
"connectionType": "custom.athena"
: Amazon Athena 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다. -
"connectionType": "custom.spark"
: Apache Spark 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다. -
"connectionType": "custom.jdbc"
: JDBC 데이터 스토어에 대한 연결을 지정합니다. 연결은 AWS Glue Studio에 업로드하는 사용자 지정 커넥터를 사용합니다.
custom.jdbc 또는 marketplace.jdbc 유형에 대한 연결 옵션
-
className
- 문자열, 필수, 드라이버 클래스 이름입니다. -
connectionName
- 문자열, 필수, 커넥터와 연결된 연결 이름입니다. -
url
- 문자열, 필수, 데이터 원본에 대한 연결을 구축하는 데 사용되는 자리 표시자(${}
)가 있는 JDBC URL입니다. 자리 표시자${secretKey}
는 AWS Secrets Manager에서 같은 이름의 보안 암호로 대체됩니다. URL 구성에 대한 자세한 내용은 데이터 스토어 설명서를 참조하세요. -
secretId
또는user/password
– 문자열, 필수, URL에 대한 자격 증명을 검색하는 데 사용됩니다. -
dbTable
또는query
- 문자열, 필수, 데이터를 가져올 테이블 또는 SQL 쿼리입니다.dbTable
또는query
을 지정할 수 있지만 둘 다 함께 지정할 수는 없습니다. -
partitionColumn
- 문자열, 선택 사항, 파티셔닝에 사용되는 정수 열의 이름입니다. 이 옵션은lowerBound
,upperBound
및numPartitions
에 포함되는 경우에만 작동합니다. 이 옵션은 Spark SQL JDBC 리더에서와 같은 방식으로 작동합니다. 자세한 내용은 Apache Spark SQL, DataFrames and Datasets Guide의 JDBC To Other Databases를 참조하세요. lowerBound
및upperBound
값은 테이블의 행을 필터링하는 것이 아니라 파티션 스트라이드를 결정하는 데 사용됩니다. 테이블의 모든 행이 분할되어 반환됩니다.참고
테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된 분할 조건에서 작동하는지 확인해야 합니다. 예:
-
쿼리 포맷이
"SELECT col1 FROM table1"
이면 파티션 열을 사용하는 쿼리 끝에WHERE
절을 추가하여 쿼리를 테스트합니다. -
쿼리 포맷이 "
SELECT col1 FROM table1 WHERE col2=val"
이면AND
와 파티션 열을 사용하는 표현식으로WHERE
절을 확장하여 쿼리를 테스트합니다.
-
-
lowerBound
- 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는partitionColumn
의 최소값입니다. -
upperBound
- 정수, 선택 사항, 파티션 스트라이드를 결정하는 데 사용되는partitionColumn
의 최대값입니다. -
numPartitions
- 정수, 선택 사항, 파티션 수입니다. 이 값은lowerBound
(포함) 및upperBound
(배타)와 함께partitionColumn
을 분할하는 데 사용되는 생성된WHERE
절 표현에 대한 파티션 스트라이드를 형성합니다.중요
파티션이 너무 많으면 외부 데이터베이스 시스템에 문제가 발생할 수 있으므로 파티션 수에 주의합니다.
-
filterPredicate
- 문자열, 선택 사항, 소스에서 데이터를 필터링하기 위한 추가 조건 절입니다. 예:BillingCity='Mountain View'
테이블 이름 대신 쿼리를 사용하는 경우 쿼리가 지정된
filterPredicate
에서 작동하는지 검증해야 합니다. 예:-
쿼리 포맷이
"SELECT col1 FROM table1"
이면 필터 조건자를 사용하는 쿼리 끝에WHERE
절을 추가하여 쿼리를 테스트합니다. -
쿼리 포맷이
"SELECT col1 FROM table1 WHERE col2=val"
이면AND
로WHERE
절을 확장하고 필터 조건자를 사용하는 표현식을 사용하여 쿼리를 테스트합니다.
-
-
dataTypeMapping
– 딕셔너리, 선택 사항, JDBC 데이터 유형에서 Glue 데이터 유형으로의 매핑을 구축하는 사용자 정의 데이터 유형 매핑입니다. 예를 들어"dataTypeMapping":{"FLOAT":"STRING"}
옵션은 드라이버의ResultSet.getString()
메서드를 호출하여 JDBC 유형FLOAT
의 데이터 필드를 JavaString
유형으로 매핑하고 이를 AWS Glue 레코드를 구축하는 데 사용합니다.ResultSet
객체는 각 드라이버에 의해 구현되므로 동작은 사용하는 드라이버에 따라 다릅니다. 드라이버가 변환을 수행하는 방법을 이해하려면 JDBC 드라이버에 대한 설명서를 참조하세요. -
현재 지원되는 AWS Glue 데이터 유형은 다음과 같습니다.
-
DATE
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
지원되는 JDBC 데이터 유형은 Java8 java.sql.types
입니다. 기본 데이터 유형 매핑(JDBC에서 AWS Glue로)은 다음과 같습니다.
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
옵션
dataTypeMapping
과 함께 사용자 정의 데이터 유형 매핑을 사용하는 경우 기본 데이터 유형 매핑을 재정의할 수 있습니다.dataTypeMapping
옵션에 나열된 JDBC 데이터 유형만 영향을 받습니다. 기본 매핑은 다른 모든 JDBC 데이터 유형에 사용됩니다. 필요한 경우 추가 JDBC 데이터 유형에 대한 매핑을 추가할 수 있습니다. JDBC 데이터 유형이 기본 매핑이나 사용자 지정 매핑에 포함되지 않은 경우 데이터 유형은 기본값으로 AWS GlueSTRING
데이터 유형으로 변환됩니다. -
다음 Python 코드 예제는 AWS Marketplace JDBC 드라이버를 사용하여 JDBC 데이터베이스에서 읽는 방법을 보여줍니다. 데이터베이스에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
custom.athena 또는 marketplace.athena 유형에 대한 연결 옵션
-
className
- 문자열, 필수, 드라이버 클래스 이름입니다. Athena-CloudWatch 커넥터를 사용하는 경우 이 파라미터 값은 클래스 이름의 접두사(예:"com.amazonaws.athena.connectors"
)입니다. Athena-CloudWatch 커넥터는 메타데이터 핸들러와 레코드 핸들러의 두 가지 클래스로 구성됩니다. 여기에 공통 접두사를 제공하면 API가 해당 접두사를 기반으로 올바른 클래스를 로드합니다. -
tableName
- 문자열, 필수, 읽을 CloudWatch 로그 스트림의 이름입니다. 이 코드 조각은 특수 보기 이름all_log_streams
을 사용합니다. 즉, 반환된 동적 데이터 프레임에는 로그 그룹의 모든 로그 스트림 데이터가 포함됩니다. -
schemaName
- 문자열, 필수, 읽을 CloudWatch 로그 그룹의 이름입니다. 예:/aws-glue/jobs/output
. -
connectionName
- 문자열, 필수, 커넥터와 연결된 연결 이름입니다.
이 커넥터에 대한 추가 옵션은 GitHub의 Amazon Athena CloudWatch 커넥터 README
다음 Python 코드 예제는 AWS Marketplace 커넥터를 사용하여 Athena 데이터 스토어에서 읽는 방법을 보여줍니다. Athena에서 읽고 S3 위치에 쓰는 방법을 보여줍니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
custom.spark 또는 marketplace.spark 유형에 대한 연결 옵션
-
className
- 문자열, 필수, 커넥터 클래스 이름입니다. -
secretId
- 문자열, 선택 사항, 커넥터 연결에 대한 자격 증명을 검색하는 데 사용됩니다. -
connectionName
- 문자열, 필수, 커넥터와 연결된 연결 이름입니다. -
다른 옵션은 데이터 스토어에 따라 다릅니다. 예를 들어 OpenSearch 구성 옵션은 Apache Hadoop용 Elasticsearch
설명서에 설명된 대로 접두사 es
로 시작합니다. Snowflake에 대한 Spark 연결은 Connecting to Snowflake 가이드의 Using the Spark Connector에 설명된 대로 sfUser
및sfPassword
와 같은 옵션을 사용합니다.
다음 Python 코드 예제는 marketplace.spark
연결을 사용하여 OpenSearch 데이터 스토어에서 읽는 방법을 보여줍니다.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
일반 옵션
이 섹션의 옵션은 connection_options
로 제공되지만 특별히 하나의 커넥터에만 적용되지는 않습니다.
다음 파라미터는 보통 북마크를 구성할 때 사용됩니다. Amazon S3 또는 JDBC 워크플로에 적용될 수도 있습니다. 자세한 내용은 작업 북마크 사용 단원을 참조하십시오.
jobBookmarkKeys
- 열 이름의 배열입니다.jobBookmarkKeysSortOrder
- 정렬 순서에 따라 값을 비교하는 방법을 정의하는 문자열입니다. 유효한 값:"asc"
,"desc"
.useS3ListImplementation
- Amazon S3 버킷 콘텐츠를 나열할 때 메모리 성능을 관리하는 데 사용됩니다. 자세한 내용은 Optimize memory management in AWS Glue를 참조하세요.