

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Revise os componentes DataStream da API
<a name="how-datastream"></a>

Seu aplicativo Apache Flink usa a [ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) para transformar dados em um fluxo de dados. 

Esta seção descreve os diferentes componentes que movem, transformam e rastreiam os dados:
+ [Use conectores para mover dados no Managed Service for Apache Flink com a API DataStream](how-connectors.md): esses componentes movem dados entre seu aplicativo e fontes de dados e destinos externos.
+ [Transforme dados usando operadores no Managed Service for Apache Flink com a API DataStream](how-operators.md): esses componentes transformam ou agrupam elementos de dados em seu aplicativo.
+ [Acompanhe eventos no Managed Service para Apache Flink usando a API DataStream](how-time.md): este tópico descreve como o Managed Service for Apache Flink rastreia eventos ao usar a DataStream API.

# Use conectores para mover dados no Managed Service for Apache Flink com a API DataStream
<a name="how-connectors"></a>

Na DataStream API Amazon Managed Service for Apache Flink, *conectores* são componentes de software que movem dados para dentro e para fora de um aplicativo Managed Service for Apache Flink. Os conectores são integrações flexíveis que permitem a leitura de arquivos e diretórios. Os conectores consistem em módulos completos para interagir com os serviços da Amazon e sistemas de terceiros.

Os tipos de conectores incluem o seguinte:
+ [Adicione fontes de dados de transmissão](how-sources.md): forneça dados para seu aplicativo a partir de um fluxo de dados do Kinesis, arquivo ou de outra fonte de dados.
+ [Grave dados usando coletores ](how-sinks.md): envie dados do seu aplicativo para um fluxo de dados do Kinesis, fluxo do Firehose ou outro destino de dados.
+ [Use E/S assíncrona](how-async.md): fornece acesso assíncrono a uma fonte de dados (como um banco de dados) para enriquecer os eventos de fluxo. 

## Conectores disponíveis
<a name="how-connectors-list"></a>

A estrutura do Apache Flink contém conectores para acessar dados de várias fontes. Para obter informações sobre conectores disponíveis na estrutura do Apache Flink, consulte [Conectores](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) na [Documentação do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Atenção**  
Se você tem aplicativos em execução no Flink 1.6, 1.8, 1.11 ou 1.13 e gostaria de executar nas regiões do Oriente Médio (EAU), Ásia-Pacífico (Hyderabad), Israel (Tel Aviv), Europa (Zurique), Oriente Médio (EAU), Ásia-Pacífico (Melbourne) e Ásia-Pacífico (Jacarta), talvez seja necessário recompilar seu archive de aplicativos com um conector atualizado ou fazer o upgrade para o Flink 1.18.   
Os conectores Apache Flink são armazenados em seus próprios repositórios de código aberto. Se você estiver atualizando para a versão 1.18 ou posterior, deverá atualizar suas dependências. Para acessar o repositório dos AWS conectores Apache Flink, consulte. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
A antiga fonte `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` do Kinesis foi descontinuada e pode ser removida com uma versão futura do Flink. Em vez disso, use o [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
Não há compatibilidade de estado entre `FlinkKinesisConsumer` e `KinesisStreamsSource`. Para obter detalhes, consulte [Migração de trabalhos existentes para a nova fonte do Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) na documentação do Apache Flink.  
 A seguir estão as diretrizes recomendadas:   


**Atualizações de conectores**  

| Versão do Flink | Conector usado | Resolução | 
| --- | --- | --- | 
| 1.19, 1.20 | Fonte do Kinesis  |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector mais recente do Kinesis Data Streams. Ele deve ser da versão 5.0.0 e posteriores. Para obter mais informações, consulte [Conector do Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Coletor do Kinesis |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do Kinesis Data Streams. Ele deve ser da versão 5.0.0 e posteriores. Para obter mais informações, consulte [Coletor de fluxos do Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | Fonte do DynamoDB Streams |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de fonte mais recente do DynamoDB Streams. Ele deve ser da versão 5.0.0 e posteriores. Para ter mais informações, consulte [conector do Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Coletor do DynamoDB  | Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do DynamoDB. Ele deve ser da versão 5.0.0 e posteriores. Para ter mais informações, consulte [conector do Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Coletor do Amazon SQS |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do Amazon SQS. Ele deve ser da versão 5.0.0 e posteriores. Para obter mais informações, consulte [Coletor do Amazon SQS](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1.19, 1.20 | Coletor do Amazon Managed Service para Prometheus |  Ao fazer o upgrade para a versão 1.19 e 1.20 do Managed Service for Apache Flink, verifique se você está usando o conector de coletor mais recente do Amazon Managed Service for Prometheus. Ele deve ser da versão 1.0.0 e posteriores. Para obter mais informações, consulte [Coletor do Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Adicione fontes de dados de transmissão ao Managed Service for Apache Flink
<a name="how-sources"></a>

O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma [fonte do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) para receber dados de um fluxo. Esta seção descreve as fontes que estão disponíveis para os serviços da Amazon.

## Use fluxos de dados do Kinesis
<a name="input-streams"></a>

A `KinesisStreamsSource` fornece dados de transmissão para seu aplicativo a partir de um fluxo de dados do Amazon Kinesis. 

### Criar uma `KinesisStreamsSource`
<a name="input-streams-create"></a>

O exemplo de código a seguir demonstra como criar um `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Para obter mais informações sobre o uso de um`KinesisStreamsSource`, consulte o [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams Connector na [documentação do Apache Flink e KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) nosso exemplo público no Github.

### Crie um `KinesisStreamsSource` que usa um consumidor EFO
<a name="input-streams-efo"></a>

O `KinesisStreamsSource` agora oferece suporte ao [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.

Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, [consulte FLIP-128: Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) for Kinesis Consumers. AWS 

Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:
+ **READER\$1TYPE: ** defina esse parâmetro como **EFO** para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME:** defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído. 

Para configurar um `KinesisStreamsSource` para usar o EFO, adicione os seguintes parâmetros ao consumidor:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Para obter um exemplo de um aplicativo do Managed Service for Apache Flink que usa um consumidor EFO, consulte [nosso exemplo público de Conectores do Kinesis no Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Use o Amazon MSK
<a name="input-msk"></a>

A fonte `KafkaSource` fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK. 

### Criar uma `KafkaSource`
<a name="input-msk-create"></a>

O exemplo de código a seguir demonstra como criar um `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Para obter mais informações sobre como usar uma `KafkaSource`, consulte [Replicação do MSK](earlier.md#example-msk).

# Grave dados com coletores no Managed Service for Apache Flink
<a name="how-sinks"></a>

No código do seu aplicativo, você pode usar qualquer conector de [coletor do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) para gravar em sistemas externos, incluindo serviços do AWS , como Kinesis Data Streams e DynamoDB.

O Apache Flink também fornece coletores para arquivos e soquetes, e você pode implementar coletores personalizados. Entre os vários coletores suportados, os seguintes são usados com frequência: 

## Use fluxos de dados do Kinesis
<a name="sinks-streams"></a>

O Apache Flink fornece informações sobre o [conector do Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) na documentação do Apache Flink.

Para obter um exemplo de um aplicativo que usa um fluxo de dados Kinesis para entrada e saída, consulte [Tutorial: Comece a usar a DataStream API no Managed Service para Apache Flink](getting-started.md).

## Use Amazon Kafka e Amazon Managed Streaming for Apache Kafka (MSK)
<a name="sinks-MSK"></a>

O [conector do Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) fornece amplo suporte para publicação de dados no Apache Kafka e no Amazon MSK, incluindo garantias “exatamente uma vez”. Para aprender a gravar no Kafka, consulte [exemplos de conectores do Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) na documentação do Apache Flink.

## Use o Amazon S3.
<a name="sinks-s3"></a>

É possível utilizar o `StreamingFileSink` do Apache Flink para gravar objetos em um bucket do Amazon S3.

Para obter um exemplo sobre como gravar objetos no S3, consulte [Exemplo: gravação em um bucket do Amazon S3](earlier.md#examples-s3). 

## Use o Firehose
<a name="sinks-firehose"></a>

O `FlinkKinesisFirehoseProducer` é um coletor do Apache Flink confiável e escalável para armazenar a saída do aplicativo usando o serviço [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). Esta seção descreve como configurar um projeto do Maven para criar e utilizar um `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Criar uma `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Exemplo de código `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Criar uma `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

O exemplo de código a seguir demonstra como criar um `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Exemplo de código `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

O exemplo de código a seguir demonstra como criar e configurar um `FlinkKinesisFirehoseProducer` e enviar dados de um fluxo de dados do Apache Flink para o serviço Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Para ver um tutorial completo sobre como usar o coletor do Firehose, consulte [Exemplo: gravação no Firehose](earlier.md#get-started-exercise-fh).

# Use I/O assíncrono no serviço gerenciado para Apache Flink
<a name="how-async"></a>

Um I/O operador assíncrono enriquece os dados do stream usando uma fonte de dados externa, como um banco de dados. O Managed Service for Apache Flink enriquece os eventos de transmissão de forma assíncrona para que as solicitações possam ser agrupadas em lotes para maior eficiência. 

Para obter mais informações, consulte [E/S assíncrona](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) na Documentação do Apache Flink.

# Transforme dados usando operadores no Managed Service for Apache Flink com a API DataStream
<a name="how-operators"></a>

Para transformar os dados recebidos em um Managed Service for Apache Flink, você usa um *operador* do Apache Flink. Um operador do Apache Flink transforma um ou mais fluxos de dados em um novo fluxo de dados. O novo fluxo de dados contém dados modificados do fluxo de dados original. O Apache Flink fornece mais de 25 operadores de processamento de fluxo pré-definidos. Para obter mais informações, consulte [Operadores](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) na Documentação do Apache Flink.

**Topics**
+ [Use operadores de transformação](#how-operators-transform)
+ [Use operadores de agregação](#how-operators-agg)

## Use operadores de transformação
<a name="how-operators-transform"></a>

Veja a seguir um exemplo de uma transformação de texto simples em um dos campos de um fluxo de dados JSON. 

Esse código cria um fluxo de dados transformado. O novo fluxo de dados tem os mesmos dados do fluxo original, com a string “` Company`" anexada ao conteúdo do campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Use operadores de agregação
<a name="how-operators-agg"></a>

Este é um exemplo de um operador de agregação. O código cria um fluxo de dados agregado. O operador cria uma janela em cascata de 5 segundos e retorna a soma dos valores de `PRICE` dos registros na janela com o mesmo valor de `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Para obter mais exemplos de código, consulte [Exemplos de como criar e trabalhar com aplicativos no Managed Service for Apache Flink.](examples-collapsibles.md). 

# Acompanhe eventos no Managed Service para Apache Flink usando a API DataStream
<a name="how-time"></a>

O Managed Service for Apache Flink rastreia eventos usando os seguintes timestamps:
+ **Tempo de processamento:** refere-se à hora do sistema da máquina que está executando a respectiva operação.
+ **Hora do evento:** refere-se à hora em que cada evento individual ocorreu em seu dispositivo produtor.
+ **Tempo de ingestão:** refere-se à hora em que os eventos entram no serviço Managed Service for Apache Flink.

Você define o tempo usado pelo ambiente de streaming usando`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Para obter mais informações sobre timestamps, consulte [Geração de marcas d’água](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) na Documentação do Apache Flink.