Tipos e opções de conexão para ETL no AWS Glue para Spark - AWS Glue

Tipos e opções de conexão para ETL no AWS Glue para Spark

No AWS Glue Glue, vários métodos e transformações do PySpark e do Scala especificam o tipo de conexão usando um parâmetro connectionType. Eles especificam opções de conexão usando um parâmetro connectionOptions ou options.

O parâmetro connectionType pode usar os valores mostrados na tabela a seguir. Os valores do parâmetro connectionOptions (ou options) associados para cada tipo estão documentados nas seções a seguir. Salvo indicação em contrário, os parâmetros se aplicam quando a conexão é usada como fonte ou coletor.

Para obter um código de exemplo que demonstra a configuração e o uso de opções de conexão, consulte a página inicial de cada tipo de conexão.

connectionType Conecta-se a
dynamodb Banco de dados do Amazon DynamoDB
kinesis Amazon Kinesis Data Streams
s3 Amazon S3
documentdb banco de dados do Amazon DocumentDB (compatível com MongoDB)
opensearch Amazon OpenSearch Service.
redshift Banco de dados do Amazon Redshift
kafka Kafka ou Amazon Managed Streaming for Apache Kafka
azurecosmos Azure Cosmos para NoSQL.
azuresql Azure SQL.
bigquery Google BigQuery.
mongodb Banco de dados MongoDB, incluindo MongoDB Atlas.
sqlserver Banco de dados do Microsoft SQL Server (consulte Conexões JDBC)
mysql Banco de dados MySQL (consulte Conexões JDBC)
oracle Banco de dados Oracle (consulte Conexões JDBC)
postgresql Banco de dados PostgreSQL (consulte Conexões JDBC)
saphana SAP HANA.
snowflake Data lake do Snowflake
teradata Teradata Vantage.
vertica Vertica.
custom.* Armazenamentos de dados do Spark, Athena ou JDBC (consulte Valores de connectionType personalizados e AWS Marketplace
marketplace.* Armazenamentos de dados do Spark, Athena ou JDBC (consulte Valores de connectionType personalizados e AWS Marketplace)

Valores de connectionType personalizados e AWS Marketplace

Incluindo o seguinte:

  • "connectionType": "marketplace.athena": designa uma conexão com um armazenamento de dados do Amazon Athena. A conexão usa um conector de AWS Marketplace.

  • "connectionType": "marketplace.spark": designa uma conexão com um armazenamento de dados do Apache Spark. A conexão usa um conector de AWS Marketplace.

  • "connectionType": "marketplace.jdbc": designa uma conexão com um armazenamento de dados do JDBC. A conexão usa um conector de AWS Marketplace.

  • "connectionType": "custom.athena": designa uma conexão com um armazenamento de dados do Amazon Athena. A conexão usa um conector personalizado que você carrega no AWS Glue Studio.

  • "connectionType": "custom.spark": designa uma conexão com um armazenamento de dados do Apache Spark. A conexão usa um conector personalizado que você carrega no AWS Glue Studio.

  • "connectionType": "custom.jdbc": designa uma conexão com um armazenamento de dados do JDBC. A conexão usa um conector personalizado que você carrega no AWS Glue Studio.

Opções de conexão para o tipo custom.jdbc ou marketplace.jdbc

  • className: string, obrigatório, nome da classe do driver.

  • connectionName: string, obrigatório, nome da conexão associada ao conector.

  • url: string, obrigatório, URL do JDBC com espaços reservados (${}) que são usados para construir a conexão com a origem dos dados. O espaço reservado ${secretKey} é substituído pelo segredo do mesmo nome em AWS Secrets Manager. Consulte a documentação do armazenamento de dados para obter mais informações sobre como construir o URL.

  • secretId ou user/password: string, obrigatório, usado para recuperar credenciais para o URL.

  • dbTable ou query: string, obrigatório, a tabela ou consulta SQL da qual obter os dados. Você pode especificar dbTable ou query, mas não os dois.

  • partitionColumn: string, opcional, o nome de uma coluna de inteiros usada para o particionamento. Essa opção só funciona quando está incluída em lowerBound, upperBound e numPartitions. Essa opção funciona da mesma maneira que no leitor JDBC Spark SQL. Para obter mais informações, consulte JDBC para outros bancos de dados no Guia do Apache Spark SQL, DataFrames e conjuntos de dados.

    Os valores de lowerBound e upperBound são usados para decidir o passo de partição, não para filtrar as linhas na tabela. Todas as linhas na tabela são particionadas e retornadas.

    nota

    Ao usar uma consulta em vez de um nome de tabela, você deve validar se a consulta funciona com a condição de particionamento especificada. Por exemplo:

    • Se o seu formato de consulta for "SELECT col1 FROM table1", teste a consulta anexando uma cláusula WHERE no final da consulta que usa a coluna de partição.

    • Se o seu formato de consulta for "SELECT col1 FROM table1 WHERE col2=val", teste a consulta estendendo a cláusula WHERE com AND e uma expressão que usa a coluna de partição.

  • lowerBound: inteiro, opcional, o valor mínimo de partitionColumn que é usado para decidir o passo de partição.

  • upperBound: inteiro, opcional, o valor máximo de partitionColumn que é usado para decidir o passo de partição.

  • numPartitions: inteiro, opcional, o número de partições. Esse valor, juntamente com lowerBound (inclusive) e upperBound (exclusive), forma os passos de partição para as expressões de cláusula WHERE geradas que são usadas para dividir a partitionColumn.

    Importante

    Tenha cuidado com a quantidade, pois muitas partições podem causar problemas em seus sistemas de banco de dados externo.

  • filterPredicate: string, opcional, cláusula de condição extra para filtrar dados da fonte. Por exemplo:

    BillingCity='Mountain View'

    Ao usar uma consulta em vez de um nome de tabela, você deve validar que a consulta funciona com o filterPredicate especificado. Por exemplo:

    • Se o seu formato de consulta for "SELECT col1 FROM table1", teste a consulta anexando uma cláusula WHERE no final da consulta que usa o predicado do filtro.

    • Se o seu formato de consulta for "SELECT col1 FROM table1 WHERE col2=val", teste a consulta estendendo a cláusula WHERE com AND e uma expressão que usa o predicado do filtro.

  • dataTypeMapping: dicionário, opcional, mapeamento de tipo de dados personalizado que constrói um mapeamento a partir de um tipo de dados JDBC para um tipo de dados Glue. Por exemplo, a opção "dataTypeMapping":{"FLOAT":"STRING"} mapeia campos de dados JDBC do tipo FLOAT para o tipo String do Java chamando o método ResultSet.getString() do driver e o usa para complilar registros do AWS Glue. O objeto ResultSet é implantado por cada driver, portanto, o comportamento é específico para o driver que você usa. Consulte a documentação do driver do JDBC para entender como ele executa as conversões.

  • Os tipos de dados do AWS Glue compatíveis atualmente são:

    • DATA

    • STRING

    • TIMESTAMP

    • INT

    • FLOAT

    • LONG

    • BIGDECIMAL

    • BYTE

    • SHORT

    • DOUBLE

    Os tipos de dados JDBC compatíveis são Java8 java.sql.types.

    Os mapeamentos de tipos de dados padrão (de JDBC para o AWS Glue) são:

    • DATE -> DATE

    • VARCHAR -> STRING

    • CHAR -> STRING

    • LONGNVARCHAR -> STRING

    • TIMESTAMP -> TIMESTAMP

    • INTEGER -> INT

    • FLOAT -> FLOAT

    • REAL -> FLOAT

    • BIT -> BOOLEAN

    • BOOLEAN -> BOOLEAN

    • BIGINT -> LONG

    • DECIMAL -> BIGDECIMAL

    • NUMERIC -> BIGDECIMAL

    • TINYINT -> SHORT

    • SMALLINT -> SHORT

    • DOUBLE -> DOUBLE

    Se você usar um mapeamento de tipo de dados personalizado com a opção dataTypeMapping, poderá substituir um mapeamento de tipo de dados padrão. Somente os tipos de dados JDBC listados na opção dataTypeMapping são afetados. O mapeamento padrão é usado para todos os outros tipos de dados JDBC. Você pode adicionar mapeamentos para tipos de dados JDBC adicionais, se necessário. Se um tipo de dados JDBC não estiver incluído no mapeamento padrão ou em um mapeamento personalizado, o tipo de dados será convertido para o tipo de dados STRING do AWS Glue por padrão.

Os exemplos de código Python a seguir mostram como fazer a leitura de bancos de dados JDBC com drivers JDBC AWS Marketplace. Ele demonstra a leitura de um banco de dados e a gravação em um local S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"},"upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4", "partitionColumn":"id","lowerBound":"0","connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.jdbc", connection_options = {"dataTypeMapping":{"INTEGER":"STRING"}, "upperBound":"200","query":"select id, name, department from department where id < 200","numPartitions":"4","partitionColumn":"id","lowerBound":"0", "connectionName":"test-connection-jdbc"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opções de conexão para o tipo custom.athena ou marketplace.athena

  • className: string, obrigatório, nome da classe do driver. Quando você estiver usando o conector Athena-CloudWatch, esse valor de parâmetro será o prefixo da classe Name (Nome) (por exemplo, "com.amazonaws.athena.connectors"). O conector Athena-CloudWatch é composto por duas classes: um manipulador de metadados e um manipulador de registros. Se você fornecer o prefixo comum aqui, a API carregará as classes corretas com base nesse prefixo.

  • tableName: string, obrigatório, o nome do fluxo de log do CloudWatch a ser lido. Esse trecho de código usa o nome de exibição especial all_log_streams, o que significa que o quadro de dados dinâmicos retornada conterá dados de todos os fluxos de log no grupo de logs.

  • schemaName: string, obrigatório, o nome do grupo de logs do CloudWatch a ser lido. Por exemplo, /aws-glue/jobs/output.

  • connectionName: string, obrigatório, nome da conexão associada ao conector.

Para obter opções adicionais para esse conector, consulte o arquivo Amazon Athena CloudWatch Connector README (LEIAME do conector Amazon Athena CloudWatch) no GitHub.

O exemplo de código Python a seguir mostra como fazer a leitura de um armazenamento de dados do Athena usando um conector AWS Marketplace. Ele demonstra a leitura do Athena e a gravação em um local S3.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams","schemaName":"/aws-glue/jobs/output", "connectionName":"test-connection-athena"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.athena", connection_options = {"tableName":"all_log_streams",, "schemaName":"/aws-glue/jobs/output","connectionName": "test-connection-athena"}, transformation_ctx = "DataSource0") ## @type: ApplyMapping ## @args: [mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0"] ## @return: Transform0 ## @inputs: [frame = DataSource0] Transform0 = ApplyMapping.apply(frame = DataSource0, mappings = [("department", "string", "department", "string"), ("name", "string", "name", "string"), ("id", "int", "id", "int")], transformation_ctx = "Transform0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = Transform0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = Transform0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opções de conexão para o tipo custom.spark ou marketplace.spark

  • className: string, obrigatório, nome da classe do conector.

  • secretId: string, opcional, usado para recuperar credenciais para a conexão do conector.

  • connectionName: string, obrigatório, nome da conexão associada ao conector.

  • Outras opções dependem do armazenamento de dados. Por exemplo, as opções de configuração do OpenSearch começam com o prefixo es, conforme descrito na documentação Elasticsearch for Apache Hadoop (Elasticsearch para Apache Hadoop). Conexões do Spark com o Snowflake usam opções como sfUser e sfPassword, conforme descrito em Usar o conector do Spark no guia Conexão com o Snowflake.

O exemplo de código Python a seguir mostra como fazer a leitura de um armazenamento de dados do OpenSearch usando um conector marketplace.spark.

import sys from awsglue.transforms import * from awsglue.utils import getResolvedOptions from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job ## @params: [JOB_NAME] args = getResolvedOptions(sys.argv, ['JOB_NAME']) sc = SparkContext() glueContext = GlueContext(sc) spark = glueContext.spark_session job = Job(glueContext) job.init(args['JOB_NAME'], args) ## @type: DataSource ## @args: [connection_type = "marketplace.spark", connection_options = {"path":"test", "es.nodes.wan.only":"true","es.nodes":"https://<AWS endpoint>", "connectionName":"test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0"] ## @return: DataSource0 ## @inputs: [] DataSource0 = glueContext.create_dynamic_frame.from_options(connection_type = "marketplace.spark", connection_options = {"path":"test","es.nodes.wan.only": "true","es.nodes":"https://<AWS endpoint>","connectionName": "test-spark-es","es.port":"443"}, transformation_ctx = "DataSource0") ## @type: DataSink ## @args: [connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0"] ## @return: DataSink0 ## @inputs: [frame = DataSource0] DataSink0 = glueContext.write_dynamic_frame.from_options(frame = DataSource0, connection_type = "s3", format = "json", connection_options = {"path": "s3://<S3 path>/", "partitionKeys": []}, transformation_ctx = "DataSink0") job.commit()

Opções gerais

As opções nesta seção são fornecidas como connection_options, mas não se aplicam especificamente a um determinado conector.

Os parâmetros a seguir geralmente são usados ao configurar marcadores. Eles podem se aplicar aos fluxos de trabalho do Amazon S3 ou JDBC. Para ter mais informações, consulte Usar marcadores de trabalho.

  • jobBookmarkKeys: uma matriz de nomes de coluna.

  • jobBookmarkKeysSortOrder: string que define como comparar valores com base na ordem de classificação. Valores válidos: "asc", "desc".

  • useS3ListImplementation: usado para gerenciar a performance da memória ao listar o conteúdo dos buckets do Amazon S3. Para obter mais informações, consulte Optimize memory management in AWS Glue.