

Terjemahan disediakan oleh mesin penerjemah. Jika konten terjemahan yang diberikan bertentangan dengan versi bahasa Inggris aslinya, utamakan versi bahasa Inggris.

# Tinjau komponen DataStream API
<a name="how-datastream"></a>

Aplikasi Apache Flink Anda menggunakan [Apache Flink DataStream API](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/overview/) untuk mengubah data dalam aliran data. 

Bagian ini menjelaskan berbagai komponen yang memindahkan, mengubah, dan melacak data:
+ [Gunakan konektor untuk memindahkan data dalam Layanan Terkelola untuk Apache Flink dengan API DataStream](how-connectors.md): Komponen ini memindahkan data antara aplikasi Anda dan sumber serta tujuan data eksternal.
+ [Mengubah data menggunakan operator di Managed Service untuk Apache Flink dengan API DataStream](how-operators.md): Komponen ini mengubah atau mengelompokkan elemen data dalam aplikasi Anda.
+ [Lacak peristiwa di Managed Service untuk Apache Flink menggunakan API DataStream](how-time.md): Topik ini menjelaskan bagaimana Managed Service for Apache Flink melacak peristiwa saat menggunakan API. DataStream 

# Gunakan konektor untuk memindahkan data dalam Layanan Terkelola untuk Apache Flink dengan API DataStream
<a name="how-connectors"></a>

Di Amazon Managed Service for Apache Flink DataStream API, *konektor* adalah komponen perangkat lunak yang memindahkan data masuk dan keluar dari Layanan Terkelola untuk aplikasi Apache Flink. Konektor adalah integrasi fleksibel yang memungkinkan Anda membaca dari file dan direktori. Konektor terdiri dari modul lengkap untuk berinteraksi dengan layanan Amazon dan sistem pihak ketiga.

Tipe konektor termasuk berikut ini:
+ [Tambahkan sumber data streaming](how-sources.md): Berikan data ke aplikasi Anda dari Kinesis data stream, file, atau sumber data lainnya.
+ [Tulis data menggunakan sink](how-sinks.md): Kirim data dari aplikasi Anda ke aliran data Kinesis, aliran Firehose, atau tujuan data lainnya.
+ [Gunakan I/O Asinkron](how-async.md): Menyediakan akses asinkron ke sumber data (seperti basis data) untuk memperkaya peristiwa aliran. 

## Konektor yang tersedia
<a name="how-connectors-list"></a>

Kerangka kerja Apache Flink berisi konektor untuk mengakses data dari berbagai sumber. Untuk informasi tentang konektor yang tersedia di kerangka kerja Apache Flink, lihat [Konektor](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/connectors/) di [Dokumentasi Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/).

**Awas**  
Jika Anda memiliki aplikasi yang berjalan di Flink 1.6, 1.8, 1.11 atau 1.13 dan ingin berjalan di Timur Tengah (UEA), Asia Pasifik (Hyderabad), Israel (Tel Aviv), Eropa (Zurich), Timur Tengah (UEA), Asia Pasifik (Melbourne) atau Asia Pasifik (Jakarta), Anda mungkin harus membangun kembali arsip aplikasi Anda dengan konektor yang diperbarui atau meningkatkan ke Flink 1.18.   
Konektor Apache Flink disimpan di repositori open source mereka sendiri. Jika Anda memutakhirkan ke versi 1.18 atau yang lebih baru, Anda harus memperbarui dependensi Anda. Untuk mengakses repositori konektor Apache Flink AWS , lihat. [flink-connector-aws](https://github.com/apache/flink-connector-aws)  
Sumber Kinesis sebelumnya `org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer` dihentikan dan mungkin dihapus dengan rilis Flink di masa depan. Gunakan [Sumber Kinesis](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-source) sebagai gantinya.  
Tidak ada kompatibilitas status antara `FlinkKinesisConsumer` dan`KinesisStreamsSource`. Untuk detailnya, lihat [Memigrasi pekerjaan yang ada ke Sumber Aliran Kinesis baru](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#migrating-existing-jobs-to-new-kinesis-streams-source-from-kinesis-consumer) di dokumentasi Apache Flink.  
 Berikut ini adalah pedoman yang direkomendasikan:   


**Peningkatan konektor**  

| Versi Flink | Konektor yang digunakan | Resolusi | 
| --- | --- | --- | 
| 1.19, 1.20 | Sumber Kinesis |  Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.19 dan 1.20, pastikan Anda menggunakan konektor sumber Kinesis Data Streams terbaru. Itu harus versi 5.0.0 atau yang lebih baru. Untuk informasi selengkapnya, lihat [Konektor Amazon Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/).  | 
| 1.19, 1.20 | Wastafel Kinesis |  Saat memutakhirkan ke Layanan Terkelola untuk Apache Flink versi 1.19 dan 1.20, pastikan Anda menggunakan konektor sink Kinesis Data Streams terbaru. Itu harus versi 5.0.0 atau yang lebih baru. Untuk informasi lebih lanjut, lihat [Kinesis Streams](https://nightlies.apache.org/flink/flink-docs-release-1.20/docs/connectors/datastream/kinesis/#kinesis-streams-sink) Sink.  | 
| 1.19, 1.20 | Sumber Streams DynamoDB |  Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.19 dan 1.20, pastikan Anda menggunakan konektor sumber DynamoDB Streams terbaru. Itu harus versi 5.0.0 atau yang lebih baru. Untuk informasi selengkapnya, lihat Konektor [Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/).  | 
| 1.19, 1.20 | DynamoDB Wastafel | Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.19 dan 1.20, pastikan Anda menggunakan konektor wastafel DynamoDB terbaru. Itu harus versi 5.0.0 atau yang lebih baru. Untuk informasi selengkapnya, lihat Konektor [Amazon DynamoDB](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/dynamodb/). | 
| 1.19, 1.20 | Wastafel Amazon SQS |  Saat memutakhirkan ke Managed Service untuk Apache Flink versi 1.19 dan 1.20, pastikan Anda menggunakan konektor wastafel Amazon SQS terbaru. Itu harus versi 5.0.0 atau yang lebih baru. Untuk informasi selengkapnya, lihat [Amazon SQS Sink.](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/sqs/)  | 
| 1.19, 1.20 | Layanan Dikelola Amazon untuk Prometheus Sink |  Saat memutakhirkan ke Layanan Terkelola untuk Apache Flink versi 1.19 dan 1.20, pastikan Anda menggunakan Layanan Terkelola Amazon terbaru untuk konektor wastafel Prometheus. Itu harus versi 1.0.0 atau yang lebih baru. Untuk informasi lebih lanjut, lihat [Prometheus](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/prometheus/) Sink.  | 

# Tambahkan sumber data streaming ke Layanan Terkelola untuk Apache Flink
<a name="how-sources"></a>

Apache Flink menyediakan konektor untuk membaca dari file, soket, koleksi, dan sumber kustom. Dalam kode aplikasi Anda, Anda menggunakan [sumber Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.15/dev/datastream_api.html#data-sources) untuk menerima data dari aliran. Bagian ini menjelaskan sumber yang tersedia untuk layanan Amazon.

## Gunakan aliran data Kinesis
<a name="input-streams"></a>

`KinesisStreamsSource`Menyediakan data streaming ke aplikasi Anda dari aliran data Amazon Kinesis. 

### Buat `KinesisStreamsSource`
<a name="input-streams-create"></a>

Contoh kode berikut mendemonstrasikan pembuatan `KinesisStreamsSource`:

```
// Configure the KinesisStreamsSource
Configuration sourceConfig = new Configuration();
sourceConfig.set(KinesisSourceConfigOptions.STREAM_INITIAL_POSITION, KinesisSourceConfigOptions.InitialPosition.TRIM_HORIZON); // This is optional, by default connector will read from LATEST

// Create a new KinesisStreamsSource to read from specified Kinesis Stream.
KinesisStreamsSource<String> kdsSource =
        KinesisStreamsSource.<String>builder()
                .setStreamArn("arn:aws:kinesis:us-east-1:123456789012:stream/test-stream")
                .setSourceConfig(sourceConfig)
                .setDeserializationSchema(new SimpleStringSchema())
                .setKinesisShardAssigner(ShardAssignerFactory.uniformShardAssigner()) // This is optional, by default uniformShardAssigner will be used.
                .build();
```

Untuk informasi selengkapnya tentang penggunaan`KinesisStreamsSource`, lihat Konektor [Amazon Kinesis Data](https://nightlies.apache.org/flink/flink-docs-stable/docs/connectors/datastream/kinesis/) Streams dalam [dokumentasi Apache Flink dan KinesisConnectors ](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) contoh publik kami di Github.

### Buat `KinesisStreamsSource` yang menggunakan konsumen EFO
<a name="input-streams-efo"></a>

`KinesisStreamsSource`Sekarang mendukung [Enhanced Fan-Out (EFO](https://ci.apache.org/projects/flink/flink-docs-release-1.13/docs/connectors/datastream/kinesis/)). 

Jika konsumen Kinesis menggunakan EFO, layanan Kinesis Data Streams memberikan bandwidth khusus miliknya sendiri, bukan meminta konsumen berbagi bandwidth aliran tetap dengan konsumen lain yang membaca dari aliran.

Untuk informasi selengkapnya tentang penggunaan EFO dengan konsumen Kinesis, [lihat FLIP-128: Enhanced Fan Out](https://cwiki.apache.org/confluence/display/FLINK/FLIP-128%3A+Enhanced+Fan+Out+for+AWS+Kinesis+Consumers) untuk Konsumen Kinesis. AWS 

Anda mengaktifkan konsumen EFO dengan mengatur parameter berikut pada konsumen Kinesis:
+ **READER\$1TYPE:** Setel parameter ini ke **EFO** agar aplikasi Anda menggunakan konsumen EFO untuk mengakses data Kinesis Data Stream. 
+ **EFO\$1CONSUMER\$1NAME:** Atur parameter ini ke nilai string yang unik di antara konsumen aliran ini. Menggunakan kembali nama konsumen di Kinesis Data Stream yang sama akan menyebabkan konsumen sebelumnya yang menggunakan nama tersebut dihentikan. 

Untuk mengonfigurasi `KinesisStreamsSource` untuk menggunakan EFO, tambahkan parameter berikut ke konsumen:

```
sourceConfig.set(KinesisSourceConfigOptions.READER_TYPE, KinesisSourceConfigOptions.ReaderType.EFO);
sourceConfig.set(KinesisSourceConfigOptions.EFO_CONSUMER_NAME, "my-flink-efo-consumer");
```

Untuk contoh aplikasi Managed Service for Apache Flink yang menggunakan konsumen EFO, lihat contoh Konektor [Kinesis publik](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KinesisConnectors) kami di Github.

## Gunakan Amazon MSK
<a name="input-msk"></a>

Sumber `KafkaSource` menyediakan data streaming ke aplikasi Anda dari topik Amazon MSK. 

### Buat `KafkaSource`
<a name="input-msk-create"></a>

Contoh kode berikut mendemonstrasikan pembuatan `KafkaSource`:

```
KafkaSource<String> source = KafkaSource.<String>builder()
    .setBootstrapServers(brokers)
    .setTopics("input-topic")
    .setGroupId("my-group")
    .setStartingOffsets(OffsetsInitializer.earliest())
    .setValueOnlyDeserializer(new SimpleStringSchema())
    .build();

env.fromSource(source, WatermarkStrategy.noWatermarks(), "Kafka Source");
```

Untuk informasi selengkapnya tentang cara menggunakan `KafkaSource`, lihat [Replikasi MSK](earlier.md#example-msk).

# Menulis data menggunakan sink di Managed Service untuk Apache Flink
<a name="how-sinks"></a>

Dalam kode aplikasi Anda, Anda dapat menggunakan konektor [sink Apache Flink](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/overview/) untuk menulis ke sistem eksternal, termasuk AWS layanan, seperti Kinesis Data Streams dan DynamoDB.

Apache Flink juga menyediakan sink untuk file dan soket, dan Anda dapat menerapkan sink kustom. Di antara beberapa wastafel yang didukung, berikut ini sering digunakan:

## Gunakan aliran data Kinesis
<a name="sinks-streams"></a>

Apache Flink memberikan informasi tentang [Konektor Kinesis Data Streams](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kinesis/) di dokumentasi Apache Flink.

Untuk contoh aplikasi yang menggunakan Kinesis data stream untuk input dan output, lihat [Tutorial: Mulai menggunakan DataStream API di Managed Service untuk Apache Flink](getting-started.md).

## Gunakan Apache Kafka dan Amazon Managed Streaming untuk Apache Kafka (MSK)
<a name="sinks-MSK"></a>

[Konektor Apache Flink Kafka](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/connectors/datastream/kafka/#kafka-sink) memberikan dukungan ekstensif untuk menerbitkan data ke Apache Kafka dan Amazon MSK, termasuk jaminan persis sekali. Untuk mempelajari cara menulis ke Kafka, lihat [contoh Konektor Kafka dalam dokumentasi](https://github.com/aws-samples/amazon-managed-service-for-apache-flink-examples/tree/main/java/KafkaConnectors) Apache Flink.

## Gunakan Amazon S3
<a name="sinks-s3"></a>

Anda dapat menggunakan Apache Flink `StreamingFileSink` untuk menulis objek ke bucket Amazon S3.

Untuk contoh tentang cara menulis objek ke S3, lihat [Contoh: Menulis ke ember Amazon S3](earlier.md#examples-s3). 

## Gunakan Firehose
<a name="sinks-firehose"></a>

`FlinkKinesisFirehoseProducer`[Ini adalah wastafel Apache Flink yang andal dan dapat diskalakan untuk menyimpan output aplikasi menggunakan layanan Firehose.](https://docs.aws.amazon.com/firehose/latest/dev/) Bagian ini menjelaskan cara menyiapkan proyek Maven untuk membuat dan menggunakan `FlinkKinesisFirehoseProducer`.

**Topics**
+ [Buat `FlinkKinesisFirehoseProducer`](#sinks-firehose-create)
+ [Contoh Kode `FlinkKinesisFirehoseProducer`](#sinks-firehose-sample)

### Buat `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-create"></a>

Contoh kode berikut mendemonstrasikan pembuatan `FlinkKinesisFirehoseProducer`:

```
Properties outputProperties = new Properties();
outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName, new SimpleStringSchema(), outputProperties);
```

### Contoh Kode `FlinkKinesisFirehoseProducer`
<a name="sinks-firehose-sample"></a>

Contoh kode berikut menunjukkan cara membuat dan mengkonfigurasi `FlinkKinesisFirehoseProducer` dan mengirim data dari aliran data Apache Flink ke layanan Firehose.

```
 
package com.amazonaws.services.kinesisanalytics;

import com.amazonaws.services.kinesisanalytics.flink.connectors.config.ProducerConfigConstants;
import com.amazonaws.services.kinesisanalytics.flink.connectors.producer.FlinkKinesisFirehoseProducer;
import com.amazonaws.services.kinesisanalytics.runtime.KinesisAnalyticsRuntime;
import org.apache.flink.api.common.serialization.SimpleStringSchema;
import org.apache.flink.streaming.api.datastream.DataStream;
import org.apache.flink.streaming.api.environment.StreamExecutionEnvironment;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisConsumer;
import org.apache.flink.streaming.connectors.kinesis.FlinkKinesisProducer;

import org.apache.flink.streaming.connectors.kinesis.config.ConsumerConfigConstants;

import java.io.IOException;
import java.util.Map;
import java.util.Properties;

public class StreamingJob {

	private static final String region = "us-east-1";
	private static final String inputStreamName = "ExampleInputStream";
	private static final String outputStreamName = "ExampleOutputStream";

	private static DataStream<String> createSourceFromStaticConfig(StreamExecutionEnvironment env) {
		Properties inputProperties = new Properties();
		inputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);
		inputProperties.setProperty(ConsumerConfigConstants.STREAM_INITIAL_POSITION, "LATEST");

		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(), inputProperties));
	}

	private static DataStream<String> createSourceFromApplicationProperties(StreamExecutionEnvironment env)
			throws IOException {
		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		return env.addSource(new FlinkKinesisConsumer<>(inputStreamName, new SimpleStringSchema(),
				applicationProperties.get("ConsumerConfigProperties")));
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromStaticConfig() {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Properties outputProperties = new Properties();
		outputProperties.setProperty(ConsumerConfigConstants.AWS_REGION, region);

		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(), outputProperties);
		ProducerConfigConstants config = new ProducerConfigConstants();
		return sink;
	}

	private static FlinkKinesisFirehoseProducer<String> createFirehoseSinkFromApplicationProperties() throws IOException {
		/*
		 * com.amazonaws.services.kinesisanalytics.flink.connectors.config.
		 * ProducerConfigConstants
		 * lists of all of the properties that firehose sink can be configured with.
		 */

		Map<String, Properties> applicationProperties = KinesisAnalyticsRuntime.getApplicationProperties();
		FlinkKinesisFirehoseProducer<String> sink = new FlinkKinesisFirehoseProducer<>(outputStreamName,
				new SimpleStringSchema(),
				applicationProperties.get("ProducerConfigProperties"));
		return sink;
	}

	public static void main(String[] args) throws Exception {
		// set up the streaming execution environment
		final StreamExecutionEnvironment env = StreamExecutionEnvironment.getExecutionEnvironment();

		/*
		 * if you would like to use runtime configuration properties, uncomment the
		 * lines below
		 * DataStream<String> input = createSourceFromApplicationProperties(env);
		 */

		DataStream<String> input = createSourceFromStaticConfig(env);

		// Kinesis Firehose sink
		input.addSink(createFirehoseSinkFromStaticConfig());

		// If you would like to use runtime configuration properties, uncomment the
		// lines below
		// input.addSink(createFirehoseSinkFromApplicationProperties());

		env.execute("Flink Streaming Java API Skeleton");
	}
}
```

Untuk tutorial lengkap tentang cara menggunakan wastafel Firehose, lihat. [Contoh: Menulis ke Firehose](earlier.md#get-started-exercise-fh)

# Gunakan Asynchronous I/O di Managed Service untuk Apache Flink
<a name="how-async"></a>

 I/O Operator asinkron memperkaya data aliran menggunakan sumber data eksternal seperti database. Layanan Terkelola untuk Apache Flink memperkaya peristiwa streaming secara asinkron sehingga permintaan dapat dikumpulkan untuk efisiensi yang lebih besar. 

Untuk informasi selengkapnya, lihat [Asynchronous I/O](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/asyncio/) di Apache Flink Documentation.

# Mengubah data menggunakan operator di Managed Service untuk Apache Flink dengan API DataStream
<a name="how-operators"></a>

*Untuk mengubah data masuk dalam Layanan Terkelola untuk Apache Flink, Anda menggunakan operator Apache Flink.* Operator Apache Flink mengubah satu atau beberapa aliran data menjadi aliran data baru. Aliran data baru berisi data yang dimodifikasi dari aliran data asli. Apache Flink menyediakan lebih dari 25 operator pemrosesan aliran yang dibangun sebelumnya. Untuk informasi selengkapnya, lihat [Operator](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/operators/overview/) di Dokumentasi Apache Flink.

**Topics**
+ [Gunakan operator transformasi](#how-operators-transform)
+ [Gunakan operator agregasi](#how-operators-agg)

## Gunakan operator transformasi
<a name="how-operators-transform"></a>

Berikut adalah contoh transformasi teks sederhana pada salah satu bidang aliran data JSON. 

Kode ini membuat aliran data yang diubah. Aliran data baru memiliki data yang sama dengan aliran asli, dengan string "` Company`" yang ditambahkan ke isi bidang `TICKER`.

```
DataStream<ObjectNode> output = input.map(
    new MapFunction<ObjectNode, ObjectNode>() {
        @Override
        public ObjectNode map(ObjectNode value) throws Exception {
            return value.put("TICKER", value.get("TICKER").asText() + " Company");
        }
    }
);
```

## Gunakan operator agregasi
<a name="how-operators-agg"></a>

Berikut adalah contoh operator agregasi. Kode membuat aliran data agregat. Operator membuat jendela tumbling 5 detik dan menampilkan jumlah dari nilai `PRICE` untuk catatan di jendela dengan nilai `TICKER` yang sama.

```
DataStream<ObjectNode> output = input.keyBy(node -> node.get("TICKER").asText())
    .window(TumblingProcessingTimeWindows.of(Time.seconds(5)))
    .reduce((node1, node2) -> {
        double priceTotal = node1.get("PRICE").asDouble() + node2.get("PRICE").asDouble();
        node1.replace("PRICE", JsonNodeFactory.instance.numberNode(priceTotal));
    return node1;
});
```

Untuk contoh kode lainnya, lihat[Contoh untuk membuat dan bekerja dengan Managed Service untuk aplikasi Apache Flink](examples-collapsibles.md). 

# Lacak peristiwa di Managed Service untuk Apache Flink menggunakan API DataStream
<a name="how-time"></a>

Layanan Terkelola untuk Apache Flink melacak peristiwa menggunakan stempel waktu berikut:
+ **Processing Time** (Waktu Pemrosesan): Mengacu pada waktu sistem mesin yang menjalankan operasi masing-masing.
+ **Event Time** (Waktu Peristiwa): Mengacu pada waktu setiap peristiwa individu terjadi pada perangkat produksinya.
+ **Waktu Tertelan:** Mengacu pada waktu peristiwa memasuki Layanan Terkelola untuk layanan Apache Flink.

Anda mengatur waktu yang digunakan oleh lingkungan streaming menggunakan`setStreamTimeCharacteristic`. 

```
env.setStreamTimeCharacteristic(TimeCharacteristic.ProcessingTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.IngestionTime);
env.setStreamTimeCharacteristic(TimeCharacteristic.EventTime);
```

Untuk informasi selengkapnya tentang stempel waktu, lihat [Membuat Tanda Air di dokumentasi](https://nightlies.apache.org/flink/flink-docs-release-1.19/docs/dev/datastream/event-time/generating_watermarks/) Apache Flink.