

Las traducciones son generadas a través de traducción automática. En caso de conflicto entre la traducción y la version original de inglés, prevalecerá la version en inglés.

# Escritura de datos mediante receptores en Managed Service para Apache Flink
<a name="how-sinks"></a>

En el código de la aplicación, se puede utilizar cualquier conector de[receptor de Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) para escribir en sistemas externos, incluidos los servicios AWS , como Kinesis Data Streams y DynamoDB.

Apache Flink también proporciona receptores para archivos y sockets, y se pueden implementar receptores personalizados. Entre los diversos receptores compatibles, se utilizan con frecuencia los siguientes:

## Uso de Kinesis Data Streams
<a name="sinks-streams"></a>

Apache Flink proporciona información sobre el [conector de Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) en la documentación de Apache Flink.

Para ver un ejemplo de una aplicación que utiliza un flujo de datos de Kinesis como entrada y salida, consulte [Tutorial: Comience a utilizar la DataStream API en Managed Service for Apache Flink](getting-started.md).

## Uso de Apache Kafka y Amazon Managed Streaming para Apache Kafka (MSK)
<a name="sinks-MSK"></a>

El [conector Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) ofrece un amplio soporte para publicar datos en Apache Kafka y Amazon MSK, incluidas las garantías de una sola vez. Para aprender a escribir en Kafka, consulte los [ejemplos de conectores Kafka](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) en la documentación de Apache Flink.

## Uso de Amazon S3.
<a name="sinks-s3"></a>

Puede utilizar el `StreamingFileSink` de Apache Flink para escribir objetos en un bucket de Amazon S3.

Para ver un ejemplo sobre cómo escribir objetos en S3, consulte [Ejemplo: escritura en un bucket de Amazon S3](earlier.md#examples-s3). 

## Uso de Firehose
<a name="sinks-firehose"></a>

El `FlinkKinesisFirehoseProducer` es un receptor de Apache Flink fiable y escalable para almacenar los resultados de las aplicaciones mediante el servicio [Firehose](https://docs.aws.amazon.com/firehose/latest/dev/). En esta sección se describe cómo configurar un proyecto de Maven para crear y utilizar un `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Creación de un `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Ejemplo de código de `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Creación de un `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

En el siguiente código de ejemplo se muestra la creación de un `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Ejemplo de código de `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

El siguiente ejemplo de código muestra cómo crear y configurar un `FlinkKinesisFirehoseProducer` y enviar datos desde un flujo de datos de Apache Flink al servicio Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Para ver un tutorial completo sobre cómo utilizar el receptor de Firehose, consulte [Ejemplo: escritura en Firehose](earlier.md#get-started-exercise-fh).