Selecione suas preferências de cookies

Usamos cookies essenciais e ferramentas semelhantes que são necessárias para fornecer nosso site e serviços. Usamos cookies de desempenho para coletar estatísticas anônimas, para que possamos entender como os clientes usam nosso site e fazer as devidas melhorias. Cookies essenciais não podem ser desativados, mas você pode clicar em “Personalizar” ou “Recusar” para recusar cookies de desempenho.

Se você concordar, a AWS e terceiros aprovados também usarão cookies para fornecer recursos úteis do site, lembrar suas preferências e exibir conteúdo relevante, incluindo publicidade relevante. Para aceitar ou recusar todos os cookies não essenciais, clique em “Aceitar” ou “Recusar”. Para fazer escolhas mais detalhadas, clique em “Personalizar”.

Conexões do Kinesis

Modo de foco
Conexões do Kinesis - AWS Glue

É possível usar uma conexão do Kinesis para ler e gravar o Amazon Kinesis Data Streams usando informações armazenadas em uma tabela do Data Catalog ou fornecendo informações para acessar diretamente o fluxo de dados. Você pode ler informações do Kinesis em um Spark DataFrame e depois convertê-las em um Glue DynamicFrame AWS. Você pode gravar DynamicFrames no Kinesis em um formato JSON. Se você acessar diretamente o fluxo de dados, use essas opções para fornecer as informações sobre como acessar o fluxo de dados.

Se você usar getCatalogSource ou create_data_frame_from_catalog para consumir registros de uma fonte de transmissão do Kinesis, o trabalho tem o banco de dados do catálogo de dados e as informações de nome da tabela, e pode usá-los para obter alguns parâmetros básicos para leitura da fonte de transmissão do Kinesis. Se você usar getSource, getSourceWithFormat, createDataFrameFromOptions ou create_data_frame_from_options, será necessário especificar esses parâmetros básicos usando as opções de conexão descritas aqui.

Você pode especificar as opções de conexão para o Kinesis usando os seguintes argumentos para os métodos especificados na classe GlueContext.

  • Scala

    • connectionOptions: usar com getSource, createDataFrameFromOptions, getSink

    • additionalOptions: usar com getCatalogSource, getCatalogSink

    • options: usar com getSourceWithFormat, getSinkWithFormat

  • Python

    • connection_options: usar com create_data_frame_from_options, write_dynamic_frame_from_options

    • additional_options: usar com create_data_frame_from_catalog, write_dynamic_frame_from_catalog

    • options: usar com getSource, getSink

Para notas e restrições sobre trabalhos de ETL de streaming, consulte Notas e restrições sobre ETL de transmissão.

Configurar o Kinesis

Para ler um fluxo de dados do Kinesis em uma trabalho do AWS Glue Spark, você precisará de alguns pré-requisitos:

  • Se for leitura, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de leitura ao fluxo de dados do Kinesis.

  • Se for gravação, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de gravação ao fluxo de dados do Kinesis.

Em certos casos, você precisará configurar pré-requisitos adicionais:

  • Se o trabalho do AWS Glue estiver configurado com conexões de rede adicionais (normalmente para se conectar a outros conjuntos de dados) e uma dessas conexões fornecer opções de rede da Amazon VPC, isso direcionará o trabalho para se comunicar pela Amazon VPC. Nesse caso, você também precisará configurar o fluxo de dados do Kinesis para se comunicar pela Amazon VPC. É possível fazer isso criando um endpoint da VPC de interface entre a Amazon VPC e o fluxo de dados do Kinesis. Para obter mais informações, consulte Using Kinesis Data Streams with Interface VPC Endpoints.

  • Ao especificar Amazon Kinesis Data Streams em outra conta, você deve configurar os perfis e políticas para permitir o acesso entre contas. Para obter mais informações, consulte Exemplo: Ler de uma transmissão do Kinesis em outra conta.

Para obter mais informações sobre pré-requisitos de trabalho de ETL de streaming, consulte Trabalhos de transmissão de ETL no AWS Glue.

Exemplo: ler de fluxos do Kinesis

Exemplo: ler de fluxos do Kinesis

Usado em conjunto com forEachBatch.

Exemplo para fonte de transmissão do Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Exemplo: gravação em streams do Kinesis

Exemplo: ler de fluxos do Kinesis

Usado em conjunto com forEachBatch.

Exemplo para fonte de transmissão do Amazon Kinesis:

kinesis_options = { "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream", "startingPosition": "TRIM_HORIZON", "inferSchema": "true", "classification": "json" } data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)

Referência de opções de conexão do Kinesis

Designa opções de conexão para o Amazon Kinesis Data Streams.

Use as seguintes opções de conexão para fontes de dados de transmissão do Kinesis:

  • "streamARN" (obrigatório) usado para leitura/gravação. O ARN do fluxo de dados do Kinesis.

  • "classification" (Obrigatório para leitura) Usado para leitura. O formato de arquivo usado pelos dados no registro. Obrigatório, a menos que fornecido por meio do catálogo de dados.

  • "streamName" (opcional) usado para leitura. O nome de um fluxo de dados do Kinesis de onde ler. Usado com endpointUrl.

  • "endpointUrl" (opcional) usado para leitura. Padrão: “https://kinesis.us-east-1.amazonaws.com” O endpoint AWS do stream do Kinesis. Você não precisa alterar isso, a menos que esteja se conectando a uma região especial.

  • "partitionKey" (opcional) usado para gravação. A chave de partição do Kinesis usada na produção de registros.

  • "delimiter" (opcional) usado para leitura. O separador de valores usado quando a classification é CSV. O padrão é ",".

  • "startingPosition": (opcional) usado para leitura. A posição inicial no fluxo de dados do Kinesis de onde ler os dados. Os valores possíveis são "latest", "trim_horizon", "earliest" ou uma string de timestamp no formato UTC no padrão yyyy-mm-ddTHH:MM:SSZ (onde Z representa um desvio do fuso horário UTC com +/-). Por exemplo: "2023-04-04T08:00:00-04:00"). O valor padrão é "latest". Observação: a string de timestamp no formato UTC para "startingPosition" é compatível somente com a versão 4.0 ou posterior do AWS Glue.

  • "failOnDataLoss": (Opcional) Falha na tarefa se algum fragmento ativo estiver ausente ou expirado. O valor padrão é "false".

  • "awsSTSRoleARN": (opcional) usado para leitura/gravação. O nome de recurso da Amazon (ARN) da função a ser assumida com o uso do AWS Security Token Service (AWS STS). Essa função deve ter permissões para descrever ou ler operações de registro para o fluxo de dados do Kinesis. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com "awsSTSSessionName".

  • "awsSTSSessionName": (opcional) usado para leitura/gravação. Um identificador para a sessão que assume a função usando o AWS STS. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com "awsSTSRoleARN".

  • "awsSTSEndpoint": (Opcional) O AWS STS endpoint a ser usado ao se conectar ao Kinesis com uma função assumida. Isso permite usar o AWS STS endpoint regional em uma VPC, o que não é possível com o endpoint global padrão.

  • "maxFetchTimeInMs": (opcional) usado para leitura. O tempo máximo para o executor do trabalho ler registros referentes ao lote atual do fluxo de dados do Kinesis especificado em milissegundos (ms). Várias chamadas de API GetRecords podem ser feitas nesse período. O valor padrão é 1000.

  • "maxFetchRecordsPerShard": (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis por microlote. Observação: o cliente poderá exceder esse limite se o trabalho de streaming já tiver lido registros extras do Kinesis (na mesma chamada get-records). Se maxFetchRecordsPerShard precisa ser rigoroso, então precisa ser um múltiplo de maxRecordPerRead. O valor padrão é 100000.

  • "maxRecordPerRead": (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis em cada operação getRecords. O valor padrão é 10000.

  • "addIdleTimeBetweenReads": (opcional) usado para leitura. Adiciona um atraso de tempo entre duas operações getRecords. O valor padrão é "False". Essa opção só pode ser configurada para o Glue versão 2.0 e posterior.

  • "idleTimeBetweenReadsInMs": (opcional) usado para leitura. O atraso mínimo entre duas operações , especificado em ms. O valor padrão é 1000. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior.

  • "describeShardInterval": (opcional) usado para leitura. O intervalo de tempo mínimo entre duas chamadas de API ListShards para que seu script considere a refragmentação. Para obter mais informações, consulte Estratégias para refragmentação no Guia do desenvolvedor do Amazon Kinesis Data Streams. O valor padrão é 1s.

  • "numRetries": (opcional) usado para leitura. O número máximo de novas tentativas para solicitações de API do Kinesis Data Streams. O valor padrão é 3.

  • "retryIntervalMs": (opcional) usado para leitura. O período de espera (especificado em ms) antes de repetir a chamada da API Kinesis Data Streams. O valor padrão é 1000.

  • "maxRetryIntervalMs": (opcional) usado para leitura. O período de espera máximo (especificado em ms) entre duas tentativas de uma chamada de API Kinesis Data Streams. O valor padrão é 10000.

  • "avoidEmptyBatches": (opcional) usado para leitura. Evita a criação de um trabalho de micro lote vazio verificando se há dados não lidos no fluxo de dados do Kinesis antes de o lote ser iniciado. O valor padrão é "False".

  • "schema": (Obrigatório quando inferSchema é definido como false): usado para leitura. O esquema a ser usado para processar a carga. Se a classificação for avro, o esquema fornecido deverá estar no formato de esquema Avro. Se a classificação não for avro, o esquema fornecido deverá estar no formato de esquema DDL.

    Veja a seguir alguns exemplos de esquema.

    Example in DDL schema format
    `column1` INT, `column2` STRING , `column3` FLOAT
    Example in Avro schema format
    { "type":"array", "items": { "type":"record", "name":"test", "fields": [ { "name":"_id", "type":"string" }, { "name":"index", "type": [ "int", "string", "float" ] } ] } }
    `column1` INT, `column2` STRING , `column3` FLOAT
  • "inferSchema": (opcional) usado para leitura. O valor padrão é "false". Se definido como “true”, o esquema será detectado em runtime com base na carga útil em foreachbatch.

  • "avroSchema": (Obsoleto) Usado para leitura. Parâmetro usado para especificar um esquema de dados Avro quando o formato Avro é usado. Esse parâmetro foi descontinuado. Use o parâmetro schema.

  • "addRecordTimestamp": (opcional) usado para leitura. Quando essa opção for definida como "true", a saída de dados conterá uma coluna adicional denominada "__src_timestamp" que indica a hora que o registro correspondente é recebido pelo fluxo. O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.

  • "emitConsumerLagMetrics": (opcional) usado para leitura. Quando a opção for definida como "true" (verdadeira), para cada lote, serão emitidas métricas durante o período entre a hora que o registro mais antigo é recebido pelo tópico e a hora que ele chega ao AWS Glue para o CloudWatch. O nome da métrica é "glue.driver.streaming.maxConsumerLagInMs". O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior.

  • "fanoutConsumerARN": (opcional) usado para leitura. O ARN de um consumidor de fluxo do Kinesis para o fluxo especificado em streamARN. Usado para habilitar o modo de distribuição avançada para a conexão do Kinesis. Para obter mais informações sobre como consumir um fluxo do Kinesis com distribuição avançada, consulte Usar distribuição avançada nas tarefas de streaming do Kinesis.

  • "recordMaxBufferedTime" (opcional) usado para gravação. Padrão: 1000 (ms). Tempo máximo em que um registro é armazenado em buffer enquanto espera para ser gravado.

  • "aggregationEnabled" (opcional) usado para gravação. Padrão: true. Especifica se os registros devem ser agregados antes de serem enviados para o Kinesis.

  • "aggregationMaxSize" (opcional) usado para gravação. Padrão: 51200 (bytes) Se um registro for maior que esse limite, ele ignorará o agregador. Nota: O Kinesis impõe um limite de 50 KB no tamanho do registro. Se você definir isso além de 50 KB, registros grandes serão rejeitados pelo Kinesis.

  • "aggregationMaxCount" (opcional) usado para gravação. Padrão: 4294967295. O número máximo de itens a serem retornados em um registro agregado.

  • "producerRateLimit" (opcional) usado para gravação. Padrão: 150 (%). Limita o throughput por fragmento enviado por um único produtor (como seu trabalho), como uma porcentagem do limite de back-end.

  • "collectionMaxCount" (opcional) usado para gravação. Padrão: 500. Número máximo de itens a serem compactados em uma solicitação PutRecords.

  • "collectionMaxSize" (opcional) usado para gravação. Padrão: 5242880 (bytes) Quantidade máxima de dados a serem enviados com uma solicitação PutRecords.

PrivacidadeTermos do sitePreferências de cookies
© 2025, Amazon Web Services, Inc. ou suas afiliadas. Todos os direitos reservados.