

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Revise los componentes DataStream de la API
<a name="how-datastream"></a>

Su aplicación Apache Flink usa la [ DataStream API Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) para transformar los datos en un flujo de datos. 

En esta sección se describen los diferentes componentes que mueven, transforman y rastrean los datos:
+ [Utilice conectores para mover datos en Managed Service for Apache Flink con la API DataStream](how-connectors.md): estos componentes mueven los datos entre la aplicación y el origen y destino de datos externos.
+ [Transforme los datos mediante operadores en Managed Service for Apache Flink con la API DataStream](how-operators.md): estos componentes transforman o agrupan los elementos de datos dentro de la aplicación.
+ [Realice un seguimiento de los eventos en Managed Service for Apache Flink mediante la API DataStream](how-time.md): En este tema se describe cómo Managed Service for Apache Flink rastrea los eventos cuando se usa la API. DataStream 

# Utilice conectores para mover datos en Managed Service for Apache Flink con la API DataStream
<a name="how-connectors"></a>

En la DataStream API de Amazon Managed Service for Apache Flink, los *conectores* son componentes de software que mueven datos hacia y desde una aplicación de Managed Service for Apache Flink. Los conectores son integraciones flexibles que permiten leer archivos y directorios. Los conectores constan de módulos completos para interactuar con los servicios de Amazon y los sistemas de terceros.

Entre los tipos de conectores, se incluyen:
+ [Agregación de orígenes de datos de streaming](how-sources.md): proporcione datos a su aplicación desde un flujo de datos de Kinesis, un archivo u otro origen de datos.
+ [Escritura de datos mediante receptores](how-sinks.md): envíe datos desde su aplicación a un flujo de datos de Kinesis, un flujo de Firehose u otro destino de datos.
+ [Uso de E/S asíncrona](how-async.md): Proporciona acceso asíncrono a un origen de datos (como una base de datos) para enriquecer los eventos de flujos. 

## Conectores disponibles
<a name="how-connectors-list"></a>

El marco de Apache Flink contiene conectores para acceder a los datos desde una variedad de fuentes. Para obtener información sobre los conectores disponibles en el marco Apache Flink, consulte la sección [Connectors](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) de la [documentación de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**aviso**  
Si tiene aplicaciones que se ejecutan en Flink 1.6, 1.8, 1.11 o 1.13 y desea ejecutarlas en las regiones de Medio Oriente (EAU), Asia Pacífico (Hyderabad), Israel (Tel Aviv), Europa (Zúrich), Medio Oriente (EAU), Asia-Pacífico (Melbourne) o Asia Pacífico (Yakarta), puede que necesite volver a compilar el archivo de su aplicación con un conector actualizado o actualizar a Flink 1.18.   
Los conectores Apache Flink se almacenan en sus propios repositorios de código fuente. Si está actualizando a la versión 1.18 o posterior, debe actualizar sus dependencias. Para acceder al repositorio de los AWS conectores de Apache Flink, consulte. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
La anterior fuente de Kinesis `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` está descontinuada y podría eliminarse en una futura versión de Flink. En su lugar, utilice [Kinesis Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source).  
No hay compatibilidad de estados entre `FlinkKinesisConsumer` y `KinesisStreamsSource`. Para obtener más información, consulte [Migrating existing jobs to new Kinesis Streams Source](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) en la documentación de Apache Flink.  
 Las siguientes son las pautas recomendadas:   


**Actualizaciones de conectores**  

| Versión de Flink | Conector utilizado | Resolución | 
| --- | --- | --- | 
| 1.19, 1.20 | Origen de Kinesis |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de origen de Kinesis Data Streams más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon Kinesis Data Streams Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Sumidero Kinesis |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el concector de receptor de Kinesis Data Streams más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Kinesis Streams Sink](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink).  | 
| 1.19, 1.20 | Origen de DynamoDB Streams |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de origen de DynamoDB Streams más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | Receptor de DynamoDB | Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de receptor de DynamoDB más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon DynamoDB Connector](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Receptor de Amazon SQS |  Al actualizar a Amazon Managed Service para Apache Flink a la versión 1.19 y 1.20, asegúrese de utilizar el conector de receptor de Amazon SQS más reciente. La versión debe ser 5.0.0 o posterior. Para obtener más información, consulte [Amazon SQS Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/).  | 
| 1.19, 1.20 | Receptor de Amazon Managed Service para Prometheus |  Al actualizar a las versiones 1.19 y 1.20 de Managed Service para Apache Flink, asegúrese de utilizar el conector de receptor más reciente de Amazon Managed Service para Prometheus. La versión de debe ser 1.0.0 o posterior. Para obtener más información, consulte [Prometheus Sink](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/).  | 

# Agregación de orígenes de datos de transmisión a Managed Service para Apache Flink
<a name="how-sources"></a>

Apache Flink proporciona conectores para leer archivos, sockets, colecciones y fuentes personalizadas. En el código de su aplicación, debe utilizar una [fuente de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) para recibir datos de un flujo. En esta sección se describen las fuentes disponibles para los servicios de Amazon.

## Uso de Kinesis Data Streams
<a name="input-streams"></a>

La `KinesisStreamsSource` proporciona datos de transmisión a su aplicación desde un flujo de datos de Amazon Kinesis. 

### Creación de un `KinesisStreamsSource`
<a name="input-streams-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Para obtener más información sobre el uso de un`KinesisStreamsSource`, consulte [Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Connector en la documentación de Apache Flink [y nuestro ejemplo KinesisConnectors público](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) en Github.

### Creación de un `KinesisStreamsSource` que utilice un consumidor de EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource` ahora es compatible con la [distribución mejorada (EFO)](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/). 

Si un consumidor de Kinesis usa EFO, el servicio Kinesis Data Streams le proporciona su propio ancho de banda dedicado, en lugar de hacer que el consumidor comparta el ancho de banda fijo del flujo con los demás consumidores que leen el flujo.

Para obtener más información sobre el uso de EFO con el Kinesis Consumer, [consulte FLIP-128: salida de ventilador mejorada](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) para consumidores de Kinesis. AWS 

Para activar el consumidor EFO, configure los siguientes parámetros en el consumidor de Kinesis:
+ **READER\$1TYPE:** defina este parámetro en **EFO** para que su aplicación utilice un consumidor de EFO para acceder a los datos del flujo de datos de Kinesis. 
+ **EFO\$1CONSUMER\$1NAME:** defina este parámetro en un valor de cadena que sea único entre los consumidores de este flujo. La reutilización de un nombre de consumidor en el mismo flujo de datos de Kinesis provocará la cancelación del consumidor anterior que utilizó ese nombre. 

A fin de configurar un `KinesisStreamsSource` para que use EFO, añada los siguientes parámetros al consumidor:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Para ver un ejemplo de una aplicación de Managed Service para Apache Flink que utiliza un consumidor de EFO, consulte [nuestro ejemplo de conectores de Kinesis públicos en Github](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors).

## Uso de Amazon MSK
<a name="input-msk"></a>

La fuente `KafkaSource` proporciona datos de streaming a su aplicación desde un tema de Amazon MSK. 

### Creación de un `KafkaSource`
<a name="input-msk-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Para obtener más información sobre cómo usar un `KafkaSource`, consulte [Replicación MSK](earlier.md#example-msk).

# Escritura de datos mediante receptores en Managed Service para Apache Flink
<a name="how-sinks"></a>

En el código de la aplicación, se puede utilizar cualquier conector de[receptor de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) para escribir en sistemas externos, incluidos los servicios AWS , como Kinesis Data Streams y DynamoDB.

Apache Flink también proporciona receptores para archivos y sockets, y se pueden implementar receptores personalizados. Entre los diversos receptores compatibles, se utilizan con frecuencia los siguientes:

## Uso de Kinesis Data Streams
<a name="sinks-streams"></a>

Apache Flink proporciona información sobre el [conector de Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) en la documentación de Apache Flink.

Para ver un ejemplo de una aplicación que utiliza un flujo de datos de Kinesis como entrada y salida, consulte [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).

## Uso de Apache Kafka y Amazon Managed Streaming para Apache Kafka (MSK)
<a name="sinks-MSK"></a>

El [conector Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) ofrece un amplio soporte para publicar datos en Apache Kafka y Amazon MSK, incluidas las garantías de una sola vez. Para aprender a escribir en Kafka, consulte los [ejemplos de conectores Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) en la documentación de Apache Flink.

## Uso de Amazon S3.
<a name="sinks-s3"></a>

Puede utilizar el `StreamingFileSink` de Apache Flink para escribir objetos en un bucket de Amazon S3.

Para ver un ejemplo sobre cómo escribir objetos en S3, consulte [Ejemplo: escritura en un bucket de Amazon S3](earlier.md#examples-s3). 

## Uso de Firehose
<a name="sinks-firehose"></a>

El `FlinkKinesisFirehoseProducer` es un receptor de Apache Flink fiable y escalable para almacenar los resultados de las aplicaciones mediante el servicio [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). En esta sección se describe cómo configurar un proyecto de Maven para crear y utilizar un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Creación de un `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Ejemplo de código de `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Creación de un `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Ejemplo de código de `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

El siguiente ejemplo de código muestra cómo crear y configurar un `FlinkKinesisFirehoseProducer` y enviar datos desde un flujo de datos de Apache Flink al servicio Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Para ver un tutorial completo sobre cómo utilizar el receptor de Firehose, consulte [Ejemplo: escritura en Firehose](earlier.md#get-started-exercise-fh).

# Utilice el modo I/O asincrónico en el servicio gestionado de Apache Flink
<a name="how-async"></a>

Un I/O operador asíncrono enriquece los datos de la transmisión mediante una fuente de datos externa, como una base de datos. Managed Service para Apache Flink enriquece los eventos del flujo de forma asíncrona para que las solicitudes se puedan agrupar en lotes y aumentar la eficiencia. 

Para obtener más información, consulte [E/S asíncrona](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) en la documentación de Apache Flink.

# Transforme los datos mediante operadores en Managed Service for Apache Flink con la API DataStream
<a name="how-operators"></a>

Para transformar los datos entrantes en un Managed Service para Apache Flink, utiliza un *operador* de Apache Flink. Un operador de Apache Flink transforma uno o más flujos de datos en un nuevo flujo de datos. El nuevo flujo de datos contiene datos modificados del flujo de datos original. Apache Flink proporciona más de 25 operadores de procesamiento de flujos prediseñados. Para obtener más información, consulte [Operators](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) en la documentación de Apache Flink.

**Topics**
+ [Uso de operadores de transformación](#how-operators-transform)
+ [Uso de operadores de agregación](#how-operators-agg)

## Uso de operadores de transformación
<a name="how-operators-transform"></a>

El siguiente es un ejemplo de una transformación de texto simple en uno de los campos de un flujo de datos JSON. 

Este código crea un flujo de datos transformado. El nuevo flujo de datos tiene los mismos datos que el flujo original, con la cadena “` Company`” anexada al contenido del campo `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Uso de operadores de agregación
<a name="how-operators-agg"></a>

A continuación se muestra un ejemplo de operador de agregación. El código crea un flujo de datos agregado. El operador crea una ventana de caída de 5 segundos y devuelve la suma de los valores `PRICE` de los registros de la ventana con el mismo valor `TICKER`.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Para obtener ejemplos de código, consulte [Ejemplos de cómo crear y utilizar aplicaciones en Managed Service para Apache Flink](examples-collapsibles.md). 

# Realice un seguimiento de los eventos en Managed Service for Apache Flink mediante la API DataStream
<a name="how-time"></a>

Managed Service para Apache Flink realiza un seguimiento de los eventos mediante las siguientes marcas de tiempo:
+ **Hora de procesamiento:** se refiere a la hora del sistema de la máquina que está ejecutando la operación correspondiente.
+ **Hora del evento:** se refiere a la hora en que ocurrió cada evento individual en el dispositivo que lo produjo.
+ **Hora de adquisición de datos:** se refiere al momento en que los eventos ingresan al servicio Managed Service para Apache Flink.

El tiempo utilizado por el entorno de streaming se establece mediante`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Para obtener más información sobre marcas de tiempo, consulte [Generating Watermarks](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) en la documentación de Apache Fink.