Tipos e opções de conexão para ETL no AWS Glue para Spark
No AWS Glue Glue, vários métodos e transformações do PySpark e do Scala especificam o tipo de conexão usando um parâmetro connectionType
. Eles especificam opções de conexão usando um parâmetro connectionOptions
ou options
.
O parâmetro connectionType
pode usar os valores mostrados na tabela a seguir. Os valores do parâmetro connectionOptions
(ou options
) associados para cada tipo estão documentados nas seções a seguir. Salvo indicação em contrário, os parâmetros se aplicam quando a conexão é usada como fonte ou coletor.
Para obter um código de exemplo que demonstra a configuração e o uso de opções de conexão, consulte a página inicial de cada tipo de conexão.
Valores de connectionType personalizados e AWS Marketplace
Incluindo o seguinte:
-
"connectionType": "marketplace.athena"
: designa uma conexão com um armazenamento de dados do Amazon Athena. A conexão usa um conector de AWS Marketplace. -
"connectionType": "marketplace.spark"
: designa uma conexão com um armazenamento de dados do Apache Spark. A conexão usa um conector de AWS Marketplace. -
"connectionType": "marketplace.jdbc"
: designa uma conexão com um armazenamento de dados do JDBC. A conexão usa um conector de AWS Marketplace. -
"connectionType": "custom.athena"
: designa uma conexão com um armazenamento de dados do Amazon Athena. A conexão usa um conector personalizado que você carrega no AWS Glue Studio. -
"connectionType": "custom.spark"
: designa uma conexão com um armazenamento de dados do Apache Spark. A conexão usa um conector personalizado que você carrega no AWS Glue Studio. -
"connectionType": "custom.jdbc"
: designa uma conexão com um armazenamento de dados do JDBC. A conexão usa um conector personalizado que você carrega no AWS Glue Studio.
Opções de conexão para o tipo custom.jdbc ou marketplace.jdbc
-
className
: string, obrigatório, nome da classe do driver. -
connectionName
: string, obrigatório, nome da conexão associada ao conector. -
url
: string, obrigatório, URL do JDBC com espaços reservados (${}
) que são usados para construir a conexão com a origem dos dados. O espaço reservado${secretKey}
é substituído pelo segredo do mesmo nome em AWS Secrets Manager. Consulte a documentação do armazenamento de dados para obter mais informações sobre como construir o URL. -
secretId
ouuser/password
: string, obrigatório, usado para recuperar credenciais para o URL. -
dbTable
ouquery
: string, obrigatório, a tabela ou consulta SQL da qual obter os dados. Você pode especificardbTable
ouquery
, mas não os dois. -
partitionColumn
: string, opcional, o nome de uma coluna de inteiros usada para o particionamento. Essa opção só funciona quando está incluída emlowerBound
,upperBound
enumPartitions
. Essa opção funciona da mesma maneira que no leitor JDBC Spark SQL. Para obter mais informações, consulte JDBC para outros bancos de dadosno Guia do Apache Spark SQL, DataFrames e conjuntos de dados. Os valores de
lowerBound
eupperBound
são usados para decidir o passo de partição, não para filtrar as linhas na tabela. Todas as linhas na tabela são particionadas e retornadas.nota
Ao usar uma consulta em vez de um nome de tabela, você deve validar se a consulta funciona com a condição de particionamento especificada. Por exemplo:
-
Se o seu formato de consulta for
"SELECT col1 FROM table1"
, teste a consulta anexando uma cláusulaWHERE
no final da consulta que usa a coluna de partição. -
Se o seu formato de consulta for "
SELECT col1 FROM table1 WHERE col2=val"
, teste a consulta estendendo a cláusulaWHERE
comAND
e uma expressão que usa a coluna de partição.
-
-
lowerBound
: inteiro, opcional, o valor mínimo departitionColumn
que é usado para decidir o passo de partição. -
upperBound
: inteiro, opcional, o valor máximo departitionColumn
que é usado para decidir o passo de partição. -
numPartitions
: inteiro, opcional, o número de partições. Esse valor, juntamente comlowerBound
(inclusive) eupperBound
(exclusive), forma os passos de partição para as expressões de cláusulaWHERE
geradas que são usadas para dividir apartitionColumn
.Importante
Tenha cuidado com a quantidade, pois muitas partições podem causar problemas em seus sistemas de banco de dados externo.
-
filterPredicate
: string, opcional, cláusula de condição extra para filtrar dados da fonte. Por exemplo:BillingCity='Mountain View'
Ao usar uma consulta em vez de um nome de tabela, você deve validar que a consulta funciona com o
filterPredicate
especificado. Por exemplo:-
Se o seu formato de consulta for
"SELECT col1 FROM table1"
, teste a consulta anexando uma cláusulaWHERE
no final da consulta que usa o predicado do filtro. -
Se o seu formato de consulta for
"SELECT col1 FROM table1 WHERE col2=val"
, teste a consulta estendendo a cláusulaWHERE
comAND
e uma expressão que usa o predicado do filtro.
-
-
dataTypeMapping
: dicionário, opcional, mapeamento de tipo de dados personalizado que constrói um mapeamento a partir de um tipo de dados JDBC para um tipo de dados Glue. Por exemplo, a opção"dataTypeMapping":{"FLOAT":"STRING"}
mapeia campos de dados JDBC do tipoFLOAT
para o tipoString
do Java chamando o métodoResultSet.getString()
do driver e o usa para complilar registros do AWS Glue. O objetoResultSet
é implantado por cada driver, portanto, o comportamento é específico para o driver que você usa. Consulte a documentação do driver do JDBC para entender como ele executa as conversões. -
Os tipos de dados do AWS Glue compatíveis atualmente são:
-
DATA
-
STRING
-
TIMESTAMP
-
INT
-
FLOAT
-
LONG
-
BIGDECIMAL
-
BYTE
-
SHORT
-
DOUBLE
Os tipos de dados JDBC compatíveis são Java8 java.sql.types
. Os mapeamentos de tipos de dados padrão (de JDBC para o AWS Glue) são:
-
DATE -> DATE
-
VARCHAR -> STRING
-
CHAR -> STRING
-
LONGNVARCHAR -> STRING
-
TIMESTAMP -> TIMESTAMP
-
INTEGER -> INT
-
FLOAT -> FLOAT
-
REAL -> FLOAT
-
BIT -> BOOLEAN
-
BOOLEAN -> BOOLEAN
-
BIGINT -> LONG
-
DECIMAL -> BIGDECIMAL
-
NUMERIC -> BIGDECIMAL
-
TINYINT -> SHORT
-
SMALLINT -> SHORT
-
DOUBLE -> DOUBLE
Se você usar um mapeamento de tipo de dados personalizado com a opção
dataTypeMapping
, poderá substituir um mapeamento de tipo de dados padrão. Somente os tipos de dados JDBC listados na opçãodataTypeMapping
são afetados. O mapeamento padrão é usado para todos os outros tipos de dados JDBC. Você pode adicionar mapeamentos para tipos de dados JDBC adicionais, se necessário. Se um tipo de dados JDBC não estiver incluído no mapeamento padrão ou em um mapeamento personalizado, o tipo de dados será convertido para o tipo de dadosSTRING
do AWS Glue por padrão. -
Os exemplos de código Python a seguir mostram como fazer a leitura de bancos de dados JDBC com drivers JDBC AWS Marketplace. Ele demonstra a leitura de um banco de dados e a gravação em um local S3.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opções de conexão para o tipo custom.athena ou marketplace.athena
-
className
: string, obrigatório, nome da classe do driver. Quando você estiver usando o conector Athena-CloudWatch, esse valor de parâmetro será o prefixo da classe Name (Nome) (por exemplo,"com.amazonaws.athena.connectors"
). O conector Athena-CloudWatch é composto por duas classes: um manipulador de metadados e um manipulador de registros. Se você fornecer o prefixo comum aqui, a API carregará as classes corretas com base nesse prefixo. -
tableName
: string, obrigatório, o nome do fluxo de log do CloudWatch a ser lido. Esse trecho de código usa o nome de exibição especialall_log_streams
, o que significa que o quadro de dados dinâmicos retornada conterá dados de todos os fluxos de log no grupo de logs. -
schemaName
: string, obrigatório, o nome do grupo de logs do CloudWatch a ser lido. Por exemplo,/aws-glue/jobs/output
. -
connectionName
: string, obrigatório, nome da conexão associada ao conector.
Para obter opções adicionais para esse conector, consulte o arquivo Amazon Athena CloudWatch Connector README
O exemplo de código Python a seguir mostra como fazer a leitura de um armazenamento de dados do Athena usando um conector AWS Marketplace. Ele demonstra a leitura do Athena e a gravação em um local S3.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://
<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opções de conexão para o tipo custom.spark ou marketplace.spark
-
className
: string, obrigatório, nome da classe do conector. -
secretId
: string, opcional, usado para recuperar credenciais para a conexão do conector. -
connectionName
: string, obrigatório, nome da conexão associada ao conector. -
Outras opções dependem do armazenamento de dados. Por exemplo, as opções de configuração do OpenSearch começam com o prefixo
es
, conforme descrito na documentação Elasticsearch for Apache Hadoop(Elasticsearch para Apache Hadoop). Conexões do Spark com o Snowflake usam opções como sfUser
esfPassword
, conforme descrito em Usar o conector do Sparkno guia Conexão com o Snowflake.
O exemplo de código Python a seguir mostra como fazer a leitura de um armazenamento de dados do OpenSearch usando um conector marketplace.spark
.
import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://
<AWS endpoint>
", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>
","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>
/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()
Opções gerais
As opções nesta seção são fornecidas como connection_options
, mas não se aplicam especificamente a um determinado conector.
Os parâmetros a seguir geralmente são usados ao configurar marcadores. Eles podem se aplicar aos fluxos de trabalho do Amazon S3 ou JDBC. Para ter mais informações, consulte Usar marcadores de trabalho.
jobBookmarkKeys
: uma matriz de nomes de coluna.jobBookmarkKeysSortOrder
: string que define como comparar valores com base na ordem de classificação. Valores válidos:"asc"
,"desc"
.useS3ListImplementation
: usado para gerenciar a performance da memória ao listar o conteúdo dos buckets do Amazon S3. Para obter mais informações, consulte Optimize memory management in AWS Glue.