

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Menulis data menggunakan sink di Managed Service untuk Apache Flink
<a name="how-sinks"></a>

Dalam kode aplikasi Anda, Anda dapat menggunakan konektor [sink Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) untuk menulis ke sistem eksternal, termasuk AWS layanan, seperti Kinesis Data Streams dan DynamoDB.

Apache Flink juga menyediakan sink untuk file dan soket, dan Anda dapat menerapkan sink kustom. Di antara beberapa wastafel yang didukung, berikut ini sering digunakan:

## Gunakan aliran data Kinesis
<a name="sinks-streams"></a>

Apache Flink memberikan informasi tentang [Konektor Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) di dokumentasi Apache Flink.

Untuk contoh aplikasi yang menggunakan Kinesis data stream untuk input dan output, lihat [Tutorial: Mulai menggunakan DataStream API di Managed Service untuk Apache Flink](getting-started.md).

## Gunakan Apache Kafka dan Amazon Managed Streaming untuk Apache Kafka (MSK)
<a name="sinks-MSK"></a>

[Konektor Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) memberikan dukungan ekstensif untuk menerbitkan data ke Apache Kafka dan Amazon MSK, termasuk jaminan persis sekali. Untuk mempelajari cara menulis ke Kafka, lihat [contoh Konektor Kafka dalam dokumentasi](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) Apache Flink.

## Gunakan Amazon S3
<a name="sinks-s3"></a>

Anda dapat menggunakan Apache Flink `StreamingFileSink` untuk menulis objek ke bucket Amazon S3.

Untuk contoh tentang cara menulis objek ke S3, lihat [Contoh: Menulis ke ember Amazon S3](earlier.md#examples-s3). 

## Gunakan Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`[Ini adalah wastafel Apache Flink yang andal dan dapat diskalakan untuk menyimpan output aplikasi menggunakan layanan Firehose.](https://docs.aws.amazon.com/firehose/latest/dev/) Bagian ini menjelaskan cara menyiapkan proyek Maven untuk membuat dan menggunakan `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Buat `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Contoh Kode `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Buat `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Contoh kode berikut mendemonstrasikan pembuatan `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Contoh Kode `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

Contoh kode berikut menunjukkan cara membuat dan mengkonfigurasi `FlinkKinesisFirehoseProducer` dan mengirim data dari aliran data Apache Flink ke layanan Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Untuk tutorial lengkap tentang cara menggunakan wastafel Firehose, lihat. [Contoh: Menulis ke Firehose](earlier.md#get-started-exercise-fh)