É possível usar uma conexão do Kinesis para ler e gravar o Amazon Kinesis Data Streams usando informações armazenadas em uma tabela do Data Catalog ou fornecendo informações para acessar diretamente o fluxo de dados. Você pode ler informações do Kinesis em um Spark DataFrame e depois convertê-las em um Glue DynamicFrame AWS. Você pode gravar DynamicFrames no Kinesis em um formato JSON. Se você acessar diretamente o fluxo de dados, use essas opções para fornecer as informações sobre como acessar o fluxo de dados.
Se você usar getCatalogSource
ou create_data_frame_from_catalog
para consumir registros de uma fonte de transmissão do Kinesis, o trabalho tem o banco de dados do catálogo de dados e as informações de nome da tabela, e pode usá-los para obter alguns parâmetros básicos para leitura da fonte de transmissão do Kinesis. Se você usar getSource
, getSourceWithFormat
, createDataFrameFromOptions
ou create_data_frame_from_options
, será necessário especificar esses parâmetros básicos usando as opções de conexão descritas aqui.
Você pode especificar as opções de conexão para o Kinesis usando os seguintes argumentos para os métodos especificados na classe GlueContext
.
-
Scala
-
connectionOptions
: usar comgetSource
,createDataFrameFromOptions
,getSink
-
additionalOptions
: usar comgetCatalogSource
,getCatalogSink
-
options
: usar comgetSourceWithFormat
,getSinkWithFormat
-
-
Python
-
connection_options
: usar comcreate_data_frame_from_options
,write_dynamic_frame_from_options
-
additional_options
: usar comcreate_data_frame_from_catalog
,write_dynamic_frame_from_catalog
-
options
: usar comgetSource
,getSink
-
Para notas e restrições sobre trabalhos de ETL de streaming, consulte Notas e restrições sobre ETL de transmissão.
Configurar o Kinesis
Para ler um fluxo de dados do Kinesis em uma trabalho do AWS Glue Spark, você precisará de alguns pré-requisitos:
Se for leitura, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de leitura ao fluxo de dados do Kinesis.
Se for gravação, o trabalho do AWS Glue deve ter permissões IAM do nível de acesso de gravação ao fluxo de dados do Kinesis.
Em certos casos, você precisará configurar pré-requisitos adicionais:
-
Se o trabalho do AWS Glue estiver configurado com conexões de rede adicionais (normalmente para se conectar a outros conjuntos de dados) e uma dessas conexões fornecer opções de rede da Amazon VPC, isso direcionará o trabalho para se comunicar pela Amazon VPC. Nesse caso, você também precisará configurar o fluxo de dados do Kinesis para se comunicar pela Amazon VPC. É possível fazer isso criando um endpoint da VPC de interface entre a Amazon VPC e o fluxo de dados do Kinesis. Para obter mais informações, consulte Using Kinesis Data Streams with Interface VPC Endpoints.
-
Ao especificar Amazon Kinesis Data Streams em outra conta, você deve configurar os perfis e políticas para permitir o acesso entre contas. Para obter mais informações, consulte Exemplo: Ler de uma transmissão do Kinesis em outra conta.
Para obter mais informações sobre pré-requisitos de trabalho de ETL de streaming, consulte Trabalhos de transmissão de ETL no AWS Glue.
Exemplo: ler de fluxos do Kinesis
Exemplo: ler de fluxos do Kinesis
Usado em conjunto com forEachBatch.
Exemplo para fonte de transmissão do Amazon Kinesis:
kinesis_options =
{ "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
"startingPosition": "TRIM_HORIZON",
"inferSchema": "true",
"classification": "json"
}
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Exemplo: gravação em streams do Kinesis
Exemplo: ler de fluxos do Kinesis
Usado em conjunto com forEachBatch.
Exemplo para fonte de transmissão do Amazon Kinesis:
kinesis_options =
{ "streamARN": "arn:aws:kinesis:us-east-2:777788889999:stream/fromOptionsStream",
"startingPosition": "TRIM_HORIZON",
"inferSchema": "true",
"classification": "json"
}
data_frame_datasource0 = glueContext.create_data_frame.from_options(connection_type="kinesis", connection_options=kinesis_options)
Referência de opções de conexão do Kinesis
Designa opções de conexão para o Amazon Kinesis Data Streams.
Use as seguintes opções de conexão para fontes de dados de transmissão do Kinesis:
-
"streamARN"
(obrigatório) usado para leitura/gravação. O ARN do fluxo de dados do Kinesis. -
"classification"
(Obrigatório para leitura) Usado para leitura. O formato de arquivo usado pelos dados no registro. Obrigatório, a menos que fornecido por meio do catálogo de dados. -
"streamName"
(opcional) usado para leitura. O nome de um fluxo de dados do Kinesis de onde ler. Usado comendpointUrl
. -
"endpointUrl"
(opcional) usado para leitura. Padrão: “https://kinesis.us-east-1.amazonaws.com” O endpoint AWS do stream do Kinesis. Você não precisa alterar isso, a menos que esteja se conectando a uma região especial. -
"partitionKey"
(opcional) usado para gravação. A chave de partição do Kinesis usada na produção de registros. -
"delimiter"
(opcional) usado para leitura. O separador de valores usado quando aclassification
é CSV. O padrão é ",
". -
"startingPosition"
: (opcional) usado para leitura. A posição inicial no fluxo de dados do Kinesis de onde ler os dados. Os valores possíveis são"latest"
,"trim_horizon"
,"earliest"
ou uma string de timestamp no formato UTC no padrãoyyyy-mm-ddTHH:MM:SSZ
(ondeZ
representa um desvio do fuso horário UTC com +/-). Por exemplo: "2023-04-04T08:00:00-04:00"). O valor padrão é"latest"
. Observação: a string de timestamp no formato UTC para"startingPosition"
é compatível somente com a versão 4.0 ou posterior do AWS Glue. -
"failOnDataLoss"
: (Opcional) Falha na tarefa se algum fragmento ativo estiver ausente ou expirado. O valor padrão é"false"
. -
"awsSTSRoleARN"
: (opcional) usado para leitura/gravação. O nome de recurso da Amazon (ARN) da função a ser assumida com o uso do AWS Security Token Service (AWS STS). Essa função deve ter permissões para descrever ou ler operações de registro para o fluxo de dados do Kinesis. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com"awsSTSSessionName"
. -
"awsSTSSessionName"
: (opcional) usado para leitura/gravação. Um identificador para a sessão que assume a função usando o AWS STS. Você deve usar esse parâmetro ao acessar um fluxo de dados em uma conta diferente. Usado em conjunto com"awsSTSRoleARN"
. -
"awsSTSEndpoint"
: (Opcional) O AWS STS endpoint a ser usado ao se conectar ao Kinesis com uma função assumida. Isso permite usar o AWS STS endpoint regional em uma VPC, o que não é possível com o endpoint global padrão. -
"maxFetchTimeInMs"
: (opcional) usado para leitura. O tempo máximo para o executor do trabalho ler registros referentes ao lote atual do fluxo de dados do Kinesis especificado em milissegundos (ms). Várias chamadas de APIGetRecords
podem ser feitas nesse período. O valor padrão é1000
. -
"maxFetchRecordsPerShard"
: (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis por microlote. Observação: o cliente poderá exceder esse limite se o trabalho de streaming já tiver lido registros extras do Kinesis (na mesma chamada get-records). SemaxFetchRecordsPerShard
precisa ser rigoroso, então precisa ser um múltiplo demaxRecordPerRead
. O valor padrão é100000
. -
"maxRecordPerRead"
: (opcional) usado para leitura. O número máximo de registros a serem obtidos por fragmento no fluxo de dados do Kinesis em cada operaçãogetRecords
. O valor padrão é10000
. -
"addIdleTimeBetweenReads"
: (opcional) usado para leitura. Adiciona um atraso de tempo entre duas operaçõesgetRecords
. O valor padrão é"False"
. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior. -
"idleTimeBetweenReadsInMs"
: (opcional) usado para leitura. O atraso mínimo entre duas operações , especificado em ms. O valor padrão é1000
. Essa opção só pode ser configurada para o Glue versão 2.0 e posterior. -
"describeShardInterval"
: (opcional) usado para leitura. O intervalo de tempo mínimo entre duas chamadas de APIListShards
para que seu script considere a refragmentação. Para obter mais informações, consulte Estratégias para refragmentação no Guia do desenvolvedor do Amazon Kinesis Data Streams. O valor padrão é1s
. -
"numRetries"
: (opcional) usado para leitura. O número máximo de novas tentativas para solicitações de API do Kinesis Data Streams. O valor padrão é3
. -
"retryIntervalMs"
: (opcional) usado para leitura. O período de espera (especificado em ms) antes de repetir a chamada da API Kinesis Data Streams. O valor padrão é1000
. -
"maxRetryIntervalMs"
: (opcional) usado para leitura. O período de espera máximo (especificado em ms) entre duas tentativas de uma chamada de API Kinesis Data Streams. O valor padrão é10000
. -
"avoidEmptyBatches"
: (opcional) usado para leitura. Evita a criação de um trabalho de micro lote vazio verificando se há dados não lidos no fluxo de dados do Kinesis antes de o lote ser iniciado. O valor padrão é"False"
. -
"schema"
: (Obrigatório quando inferSchema é definido como false): usado para leitura. O esquema a ser usado para processar a carga. Se a classificação foravro
, o esquema fornecido deverá estar no formato de esquema Avro. Se a classificação não foravro
, o esquema fornecido deverá estar no formato de esquema DDL.Veja a seguir alguns exemplos de esquema.
`column1` INT, `column2` STRING , `column3` FLOAT
-
"inferSchema"
: (opcional) usado para leitura. O valor padrão é "false". Se definido como “true”, o esquema será detectado em runtime com base na carga útil emforeachbatch
. -
"avroSchema"
: (Obsoleto) Usado para leitura. Parâmetro usado para especificar um esquema de dados Avro quando o formato Avro é usado. Esse parâmetro foi descontinuado. Use o parâmetroschema
. -
"addRecordTimestamp"
: (opcional) usado para leitura. Quando essa opção for definida como "true", a saída de dados conterá uma coluna adicional denominada "__src_timestamp" que indica a hora que o registro correspondente é recebido pelo fluxo. O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior. -
"emitConsumerLagMetrics"
: (opcional) usado para leitura. Quando a opção for definida como "true" (verdadeira), para cada lote, serão emitidas métricas durante o período entre a hora que o registro mais antigo é recebido pelo tópico e a hora que ele chega ao AWS Glue para o CloudWatch. O nome da métrica é "glue.driver.streaming.maxConsumerLagInMs". O valor padrão é "false". Essa opção é compatível com o AWS Glue versão 4.0 ou posterior. -
"fanoutConsumerARN"
: (opcional) usado para leitura. O ARN de um consumidor de fluxo do Kinesis para o fluxo especificado emstreamARN
. Usado para habilitar o modo de distribuição avançada para a conexão do Kinesis. Para obter mais informações sobre como consumir um fluxo do Kinesis com distribuição avançada, consulte Usar distribuição avançada nas tarefas de streaming do Kinesis. -
"recordMaxBufferedTime"
(opcional) usado para gravação. Padrão: 1000 (ms). Tempo máximo em que um registro é armazenado em buffer enquanto espera para ser gravado. -
"aggregationEnabled"
(opcional) usado para gravação. Padrão: true. Especifica se os registros devem ser agregados antes de serem enviados para o Kinesis. -
"aggregationMaxSize"
(opcional) usado para gravação. Padrão: 51200 (bytes) Se um registro for maior que esse limite, ele ignorará o agregador. Nota: O Kinesis impõe um limite de 50 KB no tamanho do registro. Se você definir isso além de 50 KB, registros grandes serão rejeitados pelo Kinesis. -
"aggregationMaxCount"
(opcional) usado para gravação. Padrão: 4294967295. O número máximo de itens a serem retornados em um registro agregado. -
"producerRateLimit"
(opcional) usado para gravação. Padrão: 150 (%). Limita o throughput por fragmento enviado por um único produtor (como seu trabalho), como uma porcentagem do limite de back-end. -
"collectionMaxCount"
(opcional) usado para gravação. Padrão: 500. Número máximo de itens a serem compactados em uma solicitação PutRecords. -
"collectionMaxSize"
(opcional) usado para gravação. Padrão: 5242880 (bytes) Quantidade máxima de dados a serem enviados com uma solicitação PutRecords.