

As traduções são geradas por tradução automática. Em caso de conflito entre o conteúdo da tradução e da versão original em inglês, a versão em inglês prevalecerá.

# Adicione fontes de dados de transmissão ao Managed Service for Apache Flink
<a name="how-sources"></a>

O Apache Flink fornece conectores para leitura de arquivos, soquetes, coleções e fontes personalizadas. No código do seu aplicativo, você usa uma [fonte do Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) para receber dados de um fluxo. Esta seção descreve as fontes que estão disponíveis para os serviços da Amazon.

## Use fluxos de dados do Kinesis
<a name="input-streams"></a>

A `KinesisStreamsSource` fornece dados de transmissão para seu aplicativo a partir de um fluxo de dados do Amazon Kinesis. 

### Criar uma `KinesisStreamsSource`
<a name="input-streams-create"></a>

O exemplo de código a seguir demonstra como criar um `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Para obter mais informações sobre o uso de um`KinesisStreamsSource`, consulte o [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams Connector na [documentação do Apache Flink e KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) nosso exemplo público no Github.

### Crie um `KinesisStreamsSource` que usa um consumidor EFO
<a name="input-streams-efo"></a>

O `KinesisStreamsSource` agora oferece suporte ao [Enhanced Fan-Out (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Se um consumidor do Kinesis usa o EFO, o serviço Kinesis Data Streams fornece sua própria largura de banda dedicada, em vez de fazer com que o consumidor compartilhe a largura de banda fixa do stream com os outros consumidores que estão lendo o stream.

Para obter mais informações sobre como usar o EFO com o consumidor Kinesis, [consulte FLIP-128: Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) for Kinesis Consumers. AWS 

Você habilita o consumidor EFO definindo os seguintes parâmetros no consumidor do Kinesis:
+ **READER\$1TYPE: ** defina esse parâmetro como **EFO** para que seu aplicativo use um consumidor EFO para acessar os dados do Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME:** defina esse parâmetro como um valor de sequência de caracteres que é exclusivo entre os consumidores desse fluxo. A reutilização de um nome de consumidor no mesmo Kinesis Data Stream fará com que o consumidor anterior que usava esse nome seja excluído. 

Para configurar um `KinesisStreamsSource` para usar o EFO, adicione os seguintes parâmetros ao consumidor:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Para obter um exemplo de um aplicativo do Managed Service for Apache Flink que usa um consumidor EFO, consulte [nosso exemplo público de Conectores do Kinesis no Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Use o Amazon MSK
<a name="input-msk"></a>

A fonte `KafkaSource` fornece dados de transmissão para seu aplicativo a partir de um tópico do Amazon MSK. 

### Criar uma `KafkaSource`
<a name="input-msk-create"></a>

O exemplo de código a seguir demonstra como criar um `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Para obter mais informações sobre como usar uma `KafkaSource`, consulte [Replicação do MSK](earlier.md#example-msk).