

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Agregación de orígenes de datos de transmisión a Managed Service para Apache Flink
<a name="how-sources"></a>

Apache Flink proporciona conectores para leer archivos, sockets, colecciones y fuentes personalizadas. En el código de su aplicación, debe utilizar una [fuente de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) para recibir datos de un flujo. En esta sección se describen las fuentes disponibles para los servicios de Amazon.

## Uso de Kinesis Data Streams
<a name="input-streams"></a>

La `KinesisStreamsSource` proporciona datos de transmisión a su aplicación desde un flujo de datos de Amazon Kinesis. 

### Creación de un `KinesisStreamsSource`
<a name="input-streams-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Para obtener más información sobre el uso de un`KinesisStreamsSource`, consulte [Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Connector en la documentación de Apache Flink [y nuestro ejemplo KinesisConnectors público](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) en Github.

### Creación de un `KinesisStreamsSource` que utilice un consumidor de EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource` ahora es compatible con la [distribución mejorada (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Si un consumidor de Kinesis usa EFO, el servicio Kinesis Data Streams le proporciona su propio ancho de banda dedicado, en lugar de hacer que el consumidor comparta el ancho de banda fijo del flujo con los demás consumidores que leen el flujo.

Para obtener más información sobre el uso de EFO con el Kinesis Consumer, [consulte FLIP-128: salida de ventilador mejorada](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) para consumidores de Kinesis. AWS 

Para activar el consumidor EFO, configure los siguientes parámetros en el consumidor de Kinesis:
+ **READER\$1TYPE:** defina este parámetro en **EFO** para que su aplicación utilice un consumidor de EFO para acceder a los datos del flujo de datos de Kinesis. 
+ **EFO\$1CONSUMER\$1NAME:** defina este parámetro en un valor de cadena que sea único entre los consumidores de este flujo. La reutilización de un nombre de consumidor en el mismo flujo de datos de Kinesis provocará la cancelación del consumidor anterior que utilizó ese nombre. 

A fin de configurar un `KinesisStreamsSource` para que use EFO, añada los siguientes parámetros al consumidor:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Para ver un ejemplo de una aplicación de Managed Service para Apache Flink que utiliza un consumidor de EFO, consulte [nuestro ejemplo de conectores de Kinesis públicos en Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Uso de Amazon MSK
<a name="input-msk"></a>

La fuente `KafkaSource` proporciona datos de streaming a su aplicación desde un tema de Amazon MSK. 

### Creación de un `KafkaSource`
<a name="input-msk-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Para obtener más información sobre cómo usar un `KafkaSource`, consulte [Replicación MSK](earlier.md#example-msk).