Conexões do DynamoDB - AWS Glue

Conexões do DynamoDB

É possível usar o AWS Glue para Spark para ler e gravar em tabelas no DynamoDB no AWS Glue. Conecte-se ao DynamoDB usando as permissões do IAM anexadas ao seu trabalho do AWS Glue. O AWS Glueoferece suporte à gravação de dados em uma tabela do DynamoDB em uma conta da AWS diferente. Para ter mais informações, consulte Acesso a tabelas do DynamoDB entre contas e entre regiões.

Além do conector de ETL para AWS Glue DynamoDB, é possível ler do DynamoDB usando o conector de exportação para DynamoDB. Esse conector invoca uma solicitação ExportTableToPointInTime do DynamoDB e a armazena no formato DynamoDB JSON em um local do Amazon S3 fornecido por você. Em seguida, o AWS Glue cria um objeto DynamicFrame ao ler os dados do local de exportação do Amazon S3.

O gravador do DynamoDB é compatível com o AWS Glue versão 1.0 ou posterior. O conector de exportação do AWS Glue DynamoDB é compatível com o AWS Glue versão 2.0 ou posterior.

Para obter mais informações sobre o DynamoDB, consulte a documentação do Amazon DynamoDB.

nota

O leitor de ETL do DynamoDB não é compatível com filtros ou predicados de aplicação.

Configurar conexões do DynamoDB

Para se conectar ao DynamoDB via AWS Glue, conceda ao perfil do IAM associado ao seu trabalho do AWS Glue permissão para interagir com o DynamoDB. Para obter mais informações sobre as permissões necessárias para ler ou gravar no DynamoDB, consulte Ações, recursos e chaves de condição para DynamoDB na documentação do IAM.

Nas seguintes situações, configurações adicionais podem ser necessárias:

  • Ao usar o conector de exportação do DynamoDB, você precisará configurar o IAM para que seu trabalho possa solicitar exportações de tabelas do DynamoDB. Além disso, você precisará identificar um bucket do Amazon S3 para a exportação e fornecer as permissões apropriadas no IAM para que o DynamoDB grave nele e para que seu trabalho do AWS Glue possa ler dele. Para obter mais informações, consulte Solicitar uma exportação de tabela no DynamoDB.

  • Se sua tarefa do AWS Glue tiver requisitos específicos de conectividade da Amazon VPC, use o tipo de conexão NETWORK do AWS Glue para fornecer opções de rede. Como o acesso ao DynamoDB é autorizado pelo IAM, não há necessidade de um tipo de conexão AWS Glue DynamoDB.

Ler e gravar no DynamoDB

Os exemplos de código a seguir mostram como fazer a leitura (via conector ETL) e gravação de tabelas do DynamoDB. Eles demonstram a leitura de uma tabela e a gravação em uma outra tabela.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={"dynamodb.input.tableName": test_source, "dynamodb.throughput.read.percent": "1.0", "dynamodb.splits": "100" } ) print(dyf.getNumPartitions()) glue_context.write_dynamic_frame_from_options( frame=dyf, connection_type="dynamodb", connection_options={"dynamodb.output.tableName": test_sink, "dynamodb.throughput.write.percent": "1.0" } ) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.input.tableName" -> test_source, "dynamodb.throughput.read.percent" -> "1.0", "dynamodb.splits" -> "100" )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) val dynamoDbSink: DynamoDbDataSink = glueContext.getSinkWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.output.tableName" -> test_sink, "dynamodb.throughput.write.percent" -> "1.0" )) ).asInstanceOf[DynamoDbDataSink] dynamoDbSink.writeDynamicFrame(dynamicFrame) Job.commit() } }

Usar o conector de exportação do DynamoDB

O conector de exportação apresenta um melhor desempenho do que o conector de ETL quando o tamanho da tabela do DynamoDB é superior a 80 GB. Além disso, como a solicitação de exportação é conduzida fora dos processos do Spark em um trabalho do AWS Glue, você pode habilitar a autoescalabilidade de trabalhos do AWS Glue para poupar o uso de DPU durante a solicitação de exportação. Com o conector de exportação, você também não precisa configurar o número de divisões para o paralelismo do executor do Spark ou o percentual de leitura de throughput do DynamoDB.

nota

O DynamoDB tem requisitos específicos para invocar as solicitações ExportTableToPointInTime. Para mais informações, consulte Solicitação de uma exportação de tabela no DynamoDB. Por exemplo, para usar esse conector, é necessário que a Point-in-Time-Recovery (PITR – Recuperação em um ponto anterior no tempo) esteja habilitada na tabela. O conector do DynamoDB também é compatível com criptografia do AWS KMS para exportações do DynamoDB para o Amazon S3. O fornecimento de sua configuração de segurança na configuração do trabalho do AWS Glue habilita a criptografia do AWS KMS para uma exportação do DynamoDB. A chave do KMS precisa estar na mesma região do bucket do Amazon S3.

Observe que há cobranças adicionais para exportação do DynamoDB e custos de armazenamento do Amazon S3. Os dados exportados no Amazon S3 persistem após a conclusão de uma execução de trabalho, de modo que você possa reutilizá-los sem exportações adicionais do DynamoDB. Um requisito para o uso desse conector é que a recuperação a um ponto anterior no tempo (PITR) esteja habilitada para a tabela.

O conector de ETL e o conector de exportação do DynamoDB não são compatíveis com a aplicação de filtros ou predicados de aplicação na fonte do DynamoDB.

Os exemplos de código a seguir mostram como fazer a leitura (usando o conector de exportação) e imprimir o número de partições.

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dyf = glue_context.create_dynamic_frame.from_options( connection_type="dynamodb", connection_options={ "dynamodb.export": "ddb", "dynamodb.tableArn": test_source, "dynamodb.s3.bucket": bucket_name, "dynamodb.s3.prefix": bucket_prefix, "dynamodb.s3.bucketOwner": account_id_of_bucket, } ) print(dyf.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getSourceWithFormat( connectionType = "dynamodb", options = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.tableArn" -> test_source, "dynamodb.s3.bucket" -> bucket_name, "dynamodb.s3.prefix" -> bucket_prefix, "dynamodb.s3.bucketOwner" -> account_id_of_bucket, )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) Job.commit() } }

Estes exemplos mostram como fazer a leitura (usando o conector de exportação) e imprimir o número de partições de uma tabela do AWS Glue Data Catalog que tenha uma classificação dynamodb:

Python
import sys from pyspark.context import SparkContext from awsglue.context import GlueContext from awsglue.job import Job from awsglue.utils import getResolvedOptions args = getResolvedOptions(sys.argv, ["JOB_NAME"]) glue_context= GlueContext(SparkContext.getOrCreate()) job = Job(glue_context) job.init(args["JOB_NAME"], args) dynamicFrame = glue_context.create_dynamic_frame.from_catalog( database=catalog_database, table_name=catalog_table_name, additional_options={ "dynamodb.export": "ddb", "dynamodb.s3.bucket": s3_bucket, "dynamodb.s3.prefix": s3_bucket_prefix } ) print(dynamicFrame.getNumPartitions()) job.commit()
Scala
import com.amazonaws.services.glue.GlueContext import com.amazonaws.services.glue.util.GlueArgParser import com.amazonaws.services.glue.util.Job import com.amazonaws.services.glue.util.JsonOptions import com.amazonaws.services.glue.DynamoDbDataSink import org.apache.spark.SparkContext import scala.collection.JavaConverters._ object GlueApp { def main(sysArgs: Array[String]): Unit = { val glueContext = new GlueContext(SparkContext.getOrCreate()) val args = GlueArgParser.getResolvedOptions(sysArgs, Seq("JOB_NAME").toArray) Job.init(args("JOB_NAME"), glueContext, args.asJava) val dynamicFrame = glueContext.getCatalogSource( database = catalog_database, tableName = catalog_table_name, additionalOptions = JsonOptions(Map( "dynamodb.export" -> "ddb", "dynamodb.s3.bucket" -> s3_bucket, "dynamodb.s3.prefix" -> s3_bucket_prefix )) ).getDynamicFrame() print(dynamicFrame.getNumPartitions()) )

Simplificar o uso do JSON de exportação do DynamoDB

As exportações do DynamoDB feitas com o conector de exportação para DynamoDB do AWS Glue resultam em arquivos JSON de estruturas aninhadas específicas. Para mais informações, consulte Objetos de dados. O AWS Glue fornece uma transformação DynamicFrame, que pode desaninhar essas estruturas em uma forma mais fácil de usar para aplicações downstream.

A transformação pode ser invocada de duas formas. É possível definir a opção de conexão "dynamodb.simplifyDDBJson" com o valor "true" ao chamar um método para ler do DynamoDB. Você também pode chamar a transformação como um método disponível de forma independente na biblioteca do AWS Glue.

Considere o seguinte esquema gerado por uma exportação do DynamoDB:

root |-- Item: struct | |-- parentMap: struct | | |-- M: struct | | | |-- childMap: struct | | | | |-- M: struct | | | | | |-- appName: struct | | | | | | |-- S: string | | | | | |-- packageName: struct | | | | | | |-- S: string | | | | | |-- updatedAt: struct | | | | | | |-- N: string | |-- strings: struct | | |-- SS: array | | | |-- element: string | |-- numbers: struct | | |-- NS: array | | | |-- element: string | |-- binaries: struct | | |-- BS: array | | | |-- element: string | |-- isDDBJson: struct | | |-- BOOL: boolean | |-- nullValue: struct | | |-- NULL: boolean

A transformação simplifyDDBJson simplificará isso para:

root |-- parentMap: struct | |-- childMap: struct | | |-- appName: string | | |-- packageName: string | | |-- updatedAt: string |-- strings: array | |-- element: string |-- numbers: array | |-- element: string |-- binaries: array | |-- element: string |-- isDDBJson: boolean |-- nullValue: null
nota

simplifyDDBJson só está disponível no AWS Glue 3.0 e versões posteriores. A transformação unnestDDBJson também está disponível para simplificar o JSON de exportação do DynamoDB. Incentivamos os usuários a fazer a transição de unnestDDBJson para simplifyDDBJson.

Configurar o paralelismo nas operações do DynamoDB

Para aumentar a performance, é possível ajustar alguns parâmetros disponíveis para o conector do DynamoDB. Seu objetivo ao ajustar os parâmetros de paralelismo é maximizar o uso dos operadores do AWS Glue provisionados. Assim, se você precisar de mais performance, recomendamos aumentar a escala do seu trabalho horizontalmente aumentando o número de DPUs.

Você pode alterar o paralelismo em uma operação de leitura do DynamoDB usando o parâmetro dynamodb.splits ao utilizar o conector ETL. Ao ler com o conector de exportação, não é necessário configurar o número de divisões para o paralelismo do executor do Spark. Você pode alterar o paralelismo em uma operação de gravação do DynamoDB com dynamodb.output.numParallelTasks.

Ler com o conector ETL do DynamoDB

Recomendamos calcular dynamodb.splits com base no número máximo de trabalhadores definido em sua configuração de trabalho e no cálculo numSlots a seguir. Em caso de escalonamento automático, o número real de operadores disponíveis pode cair abaixo desse limite. Para obter mais informações sobre como definir o número máximo de trabalhadores, consulte Número de trabalhadores (NumberOfWorkers) em Configurar propriedades de trabalho para trabalhos do Spark no AWS Glue.

  • numExecutors = NumberOfWorkers - 1

    Para contextualizar, um executor é reservado para o driver do Spark, outros executores são usados para processar dados.

  • numSlotsPerExecutor =

    AWS Glue 3.0 and later versions
    • 4 se WorkerType for G.1X

    • 8 se WorkerType for G.2X

    • 16 se WorkerType for G.4X

    • 32 se WorkerType for G.8X

    AWS Glue 2.0 and legacy versions
    • 8 se WorkerType for G.1X

    • 16 se WorkerType for G.2X

  • numSlots = numSlotsPerExecutor * numExecutors

Recomendamos definir dynamodb.splits como o número de slots disponíveis, numSlots.

Gravar no DynamoDB

O parâmetro dynamodb.output.numParallelTasks é usado para determinar a WCU por tarefa do Spark, usando o seguinte cálculo:

permittedWcuPerTask = ( TableWCU * dynamodb.throughput.write.percent ) / dynamodb.output.numParallelTasks

O gravador do DynamoDB funcionará melhor se a configuração representar com precisão o número de tarefas do Spark gravadas no DynamoDB. Em alguns casos, talvez seja necessário substituir o cálculo padrão para aumentar o desempenho de gravação. Se você não especificar esse parâmetro, as WCU permitidas por tarefa do Spark serão calculadas automaticamente pela seguinte fórmula:

    • numPartitions = dynamicframe.getNumPartitions()

    • numSlots (conforme definido anteriormente nesta seção)

    • numParallelTasks = min(numPartitions, numSlots)

  • Exemplo 1. DPU=10, WorkerType=Standard. DynamicFrame de entrada tem 100 partições RDD.

    • numPartitions = 100

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(100, 68) = 68

  • Exemplo 2. DPU=10, WorkerType=Standard. DynamicFrame de entrada tem 20 partições RDD.

    • numPartitions = 20

    • numExecutors = (10 - 1) * 2 - 1 = 17

    • numSlots = 4 * 17 = 68

    • numParallelTasks = min(20, 68) = 20

nota

Trabalhos em versões antigas do AWS Glue e aqueles que usam operadores Standard exigem métodos diferentes para calcular o número de slots. Se você precisar ajustar a performance desses trabalhos, recomendamos fazer a transição para as versões compatíveis do AWS Glue.

Referência de opções de conexão do DynamoDB

Designa uma conexão com o Amazon DynamoDB.

As opções de conexão diferem para uma conexão da fonte e uma conexão do coletor.

“connectionType”: “dynamodb” com o conector de ETL como fonte

Ao usar o conector de ETL para DynamoDB do AWS Glue, use as seguintes opções de conexão com "connectionType": "dynamodb" como fonte:

  • "dynamodb.input.tableName": (obrigatório) a tabela do DynamoDB da qual fazer a leitura.

  • "dynamodb.throughput.read.percent": (opcional) a porcentagem de unidades de capacidade de leitura (RCU) para usar. O padrão é definido como "0,5". Os valores aceitáveis são de "0,1" a "1,5", inclusive.

    • 0.5 representa a taxa de leitura padrão, o que significa que o AWS Glue tentará consumir metade da capacidade de leitura da tabela. Se você aumentar o valor acima de 0.5, o AWS Glue aumentará a taxa de solicitação. Diminuir o valor abaixo de 0.5 reduz a taxa de solicitação de leitura. (A taxa de leitura real poderá variar, dependendo de fatores como se há uma distribuição de chaves uniformes na tabela do DynamoDB.)

    • Quando a tabela do DynamoDB está no modo sob demanda, o AWS Glue lida com a capacidade de leitura da tabela como 40.000. Para exportar uma tabela grande, recomendamos alternar sua tabela do DynamoDB para o modo sob demanda.

  • "dynamodb.splits": (opcional) define em quantas divisões essa tabela do DynamoDB deve ser particionada ao fazer a leitura. O padrão é definido como "1". Os valores aceitáveis são de "1" a "1,000,000", inclusive.

    1 indica que não há paralelismo. É altamente recomendável que você especifique um valor maior para uma performance melhor usando a fórmula abaixo. Para obter mais informações sobre como definir adequadamente um valor, consulte Configurar o paralelismo nas operações do DynamoDB.

  • "dynamodb.sts.roleArn": (opcional) o ARN da função do IAM a ser assumida para acesso entre contas. Esse parâmetro está disponível no AWS Glue 1.0 ou posterior.

  • "dynamodb.sts.roleSessionName": (opcional) nome da sessão STS. O padrão é definido como “glue-dynamodb-read-sts-session”. Esse parâmetro está disponível no AWS Glue 1.0 ou posterior.

“connectionType”: “dynamodb” com o conector de exportação para DynamoDB do AWS Glue como fonte

Use as seguintes opções de conexão com "connectionType": "dynamodb" como fonte ao usar o conector de exportação para DynamoDB do AWS Glue, que está disponível apenas para o AWS Glue versão 2.0 em diante:

  • "dynamodb.export": (obrigatório) um valor de string:

    • Se definido como ddb, habilita o conector de exportação para DynamoDB do AWS Glue. Um novo ExportTableToPointInTimeRequest será invocado durante o trabalho do AWS Glue. Uma nova exportação será gerada com o local repassado de dynamodb.s3.bucket e dynamodb.s3.prefix.

    • Se definido como s3, habilita o conector de exportação para DynamoDB do AWS Glue, mas ignora a criação de uma nova exportação do DynamoDB. Em vez disso, usa o dynamodb.s3.bucket e dynamodb.s3.prefix como o local do Amazon S3 de uma exportação anterior dessa tabela.

  • "dynamodb.tableArn": (obrigatório) a tabela do DynamoDB da qual fazer a leitura.

  • "dynamodb.unnestDDBJson": (Opcional) Padrão: false. Valores válidos: booleano. Se definido como “true” (verdadeiro), executa uma transformação de desaninhamento da estrutura JSON do DynamoDB que está presente nas exportações. É um erro definir "dynamodb.unnestDDBJson" e "dynamodb.simplifyDDBJson" como verdadeiro ao mesmo tempo. No AWS Glue 3.0 e versões posteriores, recomendamos usar "dynamodb.simplifyDDBJson" para melhorar o comportamento ao simplificar os tipos de mapas do DynamoDB. Para ter mais informações, consulte Simplificar o uso do JSON de exportação do DynamoDB.

  • "dynamodb.simplifyDDBJson": (Opcional) Padrão: false. Valores válidos: booleano. Se definido como “true” (verdadeiro), executa uma transformação para simplificar o esquema da estrutura JSON do DynamoDB que está presente nas exportações. Isso tem a mesma finalidade que a opção "dynamodb.unnestDDBJson", mas fornece melhor suporte a tipos de mapas do DynamoDB ou até mesmo tipos de mapas aninhados em sua tabela do DynamoDB. Esse atributo só está disponível no AWS Glue 3.0 e versões posteriores. É um erro definir "dynamodb.unnestDDBJson" e "dynamodb.simplifyDDBJson" como verdadeiro ao mesmo tempo. Para ter mais informações, consulte Simplificar o uso do JSON de exportação do DynamoDB.

  • "dynamodb.s3.bucket": (opcional) indica o local do bucket do Amazon S3 no qual o processo ExportTableToPointInTime do DynamoDB deve ser executado. O formato de arquivo para a exportação é DynamoDB JSON.

    • "dynamodb.s3.prefix": (Opcional) indica o local do prefixo do Amazon S3 dentro do bucket do Amazon S3 no qual as cargas ExportTableToPointInTime do DynamoDB devem ser armazenadas. Se não houver a especificação de dynamodb.s3.prefix e dynamodb.s3.bucket, esses valores serão definidos por padrão para o local do diretório temporário especificado na configuração de trabalho do AWS Glue. Para mais informações, consulte Parâmetros especiais usados pelo AWS Glue.

    • "dynamodb.s3.bucketOwner": indica o proprietário do bucket necessário para acesso entre contas do Amazon S3.

  • "dynamodb.sts.roleArn": (opcional) o ARN do perfil do IAM a ser assumido para acesso entre contas e/ou acesso entre regiões para a tabela do DynamoDB. Observação: o mesmo ARN de função do IAM será usado para acessar o local do Amazon S3 especificado para a solicitação ExportTableToPointInTime.

  • "dynamodb.sts.roleSessionName": (opcional) nome da sessão STS. O padrão é definido como “glue-dynamodb-read-sts-session”.

  • "dynamodb.exportTime" (Opcional) Valores válidos: strings representando instantes ISO-8601. Um momento no qual a exportação deve ser feita.

  • "dynamodb.sts.region": (obrigatório se estiver fazendo uma chamada entre regiões usando um endpoint regional) a região que hospeda a tabela do DynamoDB que você deseja ler.

“connectionType”: “dynamodb” com o conector de ETL como coletor

Use as seguintes opções de conexão com "connectionType": "dynamodb" como coletor:

  • "dynamodb.output.tableName": (obrigatório) a tabela do DynamoDB na qual fazer a gravação.

  • "dynamodb.throughput.write.percent": (opcional) a porcentagem de unidades de capacidade de gravação (WCU) para usar. O padrão é definido como "0,5". Os valores aceitáveis são de "0,1" a "1,5", inclusive.

    • 0.5 representa a taxa de gravação padrão, o que significa que o AWS Glue tentará consumir metade da capacidade de gravação da tabela. Se você aumentar o valor acima de 0,5, o AWS Glue aumentará a taxa de solicitação. Diminuir o valor abaixo de 0,5 reduz a taxa de solicitação de gravação. (A taxa de gravação real pode variar, dependendo de fatores como a existência de uma distribuição de chaves uniforme na tabela do DynamoDB.)

    • Quando a tabela do DynamoDB está no modo sob demanda, o AWS Glue lida com a capacidade de gravação da tabela como 40000. Para importar uma tabela grande, recomendamos mudar sua tabela do DynamoDB para o modo sob demanda.

  • "dynamodb.output.numParallelTasks": (opcional) define quantas tarefas simultâneas gravam no DynamoDB ao mesmo tempo. Usado para calcular as WCU permissivas por tarefa do Spark. Na maioria dos casos, o AWS Glue calculará um valor padrão razoável para esse valor. Para ter mais informações, consulte Configurar o paralelismo nas operações do DynamoDB.

  • "dynamodb.output.retry": (opcional) define quantas novas tentativas realizamos quando há uma ProvisionedThroughputExceededException do DynamoDB. O padrão é definido como “10”.

  • "dynamodb.sts.roleArn": (opcional) o ARN da função do IAM a ser assumida para acesso entre contas.

  • "dynamodb.sts.roleSessionName": (opcional) nome da sessão STS. O padrão é definido como “glue-dynamodb-write-sts-session”.